Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[db] implement Get/Put/Delete/Version() for BoltDBVersioned #4256

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 187 additions & 18 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,22 @@
package db

import (
"bytes"
"context"
"fmt"
"math"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"

"github.com/iotexproject/iotex-core/db/batch"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/util/byteutil"
)

var (
ErrDeleted = errors.New("deleted in DB")
_minKey = []byte{0} // the minimum key, used to store namespace's metadata
)

type (
Expand All @@ -26,7 +39,7 @@ type (
// 1. the version when the key is first created
// 2. the version when the key is lastly written
// 3. the version when the key is deleted
// 4. hash of the key's last written value (to detect/avoid same write)
// 4. the key's last written value (to fast-track read of current version)
// If the location does not store a value, the key has never been written.
//
// How to use a versioned DB:
Expand All @@ -43,11 +56,14 @@ type (
KvVersioned interface {
lifecycle.StartStopper

// Base returns the underlying KVStore
Base() KVStore

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
SetVersion(uint64) KVStoreBasic
SetVersion(uint64) KVStore
}

// BoltDBVersioned is KvVersioned implementation based on bolt DB
Expand Down Expand Up @@ -80,50 +96,192 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error {
return b.db.Stop(ctx)
}

// Base returns the underlying KVStore
func (b *BoltDBVersioned) Base() KVStore {
return b.db
}

// Put writes a <key, value> record
func (b *BoltDBVersioned) Put(ns string, version uint64, key, value []byte) error {
func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) error {
Copy link
Member

@envestcc envestcc Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to implement these single functionPut/Delete? I prefer only keep WriteBatch, b/c version update will be atomic, and also can simplify the impl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, it does not hurt to keep it

if !b.db.IsReady() {
return ErrDBNotStarted
}
// TODO: implement Put
return nil
// check namespace
vn, err := b.checkNamespace(ns)
if err != nil {
return err
}
buf := batch.NewBatch()
if vn == nil {
// namespace not yet created
buf.Put(ns, _minKey, (&versionedNamespace{
keyLen: uint32(len(key)),
}).serialize(), "failed to create metadata")
} else {
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
last, _, err := b.get(math.MaxUint64, ns, key)
if !isNotExist(err) && version < last {
// not allowed to perform write on an earlier version
return ErrInvalid
}
buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete?

}
buf.Put(ns, keyForWrite(key, version), value, fmt.Sprintf("failed to put key %x", key))
return b.db.WriteBatch(buf)
}

// Get retrieves the most recent version
func (b *BoltDBVersioned) Get(ns string, version uint64, key []byte) ([]byte, error) {
func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, error) {
if !b.db.IsReady() {
return nil, ErrDBNotStarted
}
// TODO: implement Get
return nil, nil
// check key's metadata
if err := b.checkNamespaceAndKey(ns, key); err != nil {
return nil, err
}
_, v, err := b.get(version, ns, key)
return v, err
}

// Delete deletes a record,if key is nil,this will delete the whole bucket
func (b *BoltDBVersioned) Delete(ns string, key []byte) error {
func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) (uint64, []byte, error) {
var (
last uint64
isDelete bool
minimum = keyForDelete(key, 0)
value []byte
)
key = keyForWrite(key, version)
err := b.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
return ErrBucketNotExist
}
c := bucket.Cursor()
k, v := c.Seek(key)
if k == nil || bytes.Compare(k, key) == 1 {
k, v = c.Prev()
if k == nil || bytes.Compare(k, minimum) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
return ErrNotExist
}
}
isDelete, last = parseKey(k)
value = make([]byte, len(v))
copy(value, v)
return nil
})
if err != nil {
return last, nil, err
}
if isDelete {
return last, nil, ErrDeleted
}
return last, value, nil
}

// Delete deletes a record, if key does not exist, it returns nil
func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error {
if !b.db.IsReady() {
return ErrDBNotStarted
}
// TODO: implement Delete
return nil
// check key's metadata
if err := b.checkNamespaceAndKey(ns, key); err != nil {
return err
}
last, _, err := b.get(math.MaxUint64, ns, key)
if isNotExist(err) {
return err
}
if version < last {
// not allowed to perform delete on an earlier version
return ErrInvalid
}
buf := batch.NewBatch()
buf.Put(ns, keyForDelete(key, version), nil, fmt.Sprintf("failed to delete key %x", key))
buf.Delete(ns, keyForWrite(key, version), fmt.Sprintf("failed to delete key %x", key))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete write key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline

  1. it is to keep only the last operation on the key
  2. i recall in the original POC, we have cases where the same key is deleted, then written in the batch at around height 400k. So it is a valid request (to delete then write on the same height). We'll allow this behavior (and also write then delete on the same height)

return b.db.WriteBatch(buf)
}

// Version returns the key's most recent version
func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
if !b.db.IsReady() {
return 0, ErrDBNotStarted
}
// TODO: implement Version
return 0, nil
// check key's metadata
if err := b.checkNamespaceAndKey(ns, key); err != nil {
return 0, err
}
last, _, err := b.get(math.MaxUint64, ns, key)
if isNotExist(err) {
// key not yet written
err = errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
return last, err
}

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
func (b *BoltDBVersioned) SetVersion(v uint64) KVStoreBasic {
func (b *BoltDBVersioned) SetVersion(v uint64) KVStore {
return &KvWithVersion{
db: b,
version: v,
}
}

func isNotExist(err error) bool {
return err == ErrNotExist || err == ErrBucketNotExist
}

func keyForWrite(key []byte, v uint64) []byte {
k := make([]byte, len(key), len(key)+9)
copy(k, key)
k = append(k, byteutil.Uint64ToBytesBigEndian(v)...)
return append(k, 1)
}

func keyForDelete(key []byte, v uint64) []byte {
k := make([]byte, len(key), len(key)+9)
copy(k, key)
k = append(k, byteutil.Uint64ToBytesBigEndian(v)...)
return append(k, 0)
}

func parseKey(key []byte) (bool, uint64) {
size := len(key)
return (key[size-1] == 0), byteutil.BytesToUint64BigEndian(key[size-9 : size-1])
}

func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) {
data, err := b.db.Get(ns, _minKey)
switch errors.Cause(err) {
case nil:
vn, err := deserializeVersionedNamespace(data)
if err != nil {
return nil, err
}
return vn, nil
case ErrNotExist, ErrBucketNotExist:
return nil, nil
default:
return nil, err
}
}

func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error {
vn, err := b.checkNamespace(ns)
if err != nil {
return err
}
if vn == nil {
return errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns)
}
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
return nil
}

// KvWithVersion wraps the BoltDBVersioned with a certain version
type KvWithVersion struct {
db *BoltDBVersioned
Expand All @@ -142,15 +300,26 @@ func (b *KvWithVersion) Stop(context.Context) error {

// Put writes a <key, value> record
func (b *KvWithVersion) Put(ns string, key, value []byte) error {
return b.db.Put(ns, b.version, key, value)
return b.db.Put(b.version, ns, key, value)
}

// Get retrieves a key's value
func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) {
return b.db.Get(ns, b.version, key)
return b.db.Get(b.version, ns, key)
}

// Delete deletes a key
func (b *KvWithVersion) Delete(ns string, key []byte) error {
return b.db.Delete(ns, key)
return b.db.Delete(b.version, ns, key)
}

// Filter returns <k, v> pair in a bucket that meet the condition
func (b *KvWithVersion) Filter(ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) {
panic("Filter not supported for versioned DB")
}

// WriteBatch commits a batch
func (b *KvWithVersion) WriteBatch(kvsb batch.KVStoreBatch) error {
// TODO: implement WriteBatch
return nil
}
Loading
Loading