Skip to content

Commit

Permalink
[db] implement Get/Put/Delete/Version() for BoltDBVersioned
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed May 2, 2024
1 parent 1110804 commit 628bb2d
Show file tree
Hide file tree
Showing 6 changed files with 647 additions and 61 deletions.
207 changes: 189 additions & 18 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,21 @@
package db

import (
"bytes"
"context"
"fmt"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"

"github.com/iotexproject/iotex-core/db/batch"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/util/byteutil"
)

var (
ErrDeleted = errors.New("deleted in DB")
_minKey = []byte{0} // the minimum key, used to store namespace's metadata
)

type (
Expand All @@ -26,7 +38,7 @@ type (
// 1. the version when the key is first created
// 2. the version when the key is lastly written
// 3. the version when the key is deleted
// 4. hash of the key's last written value (to detect/avoid same write)
// 4. the key's last written value (to fast-track read of current version)
// If the location does not store a value, the key has never been written.
//
// How to use a versioned DB:
Expand All @@ -43,11 +55,14 @@ type (
KvVersioned interface {
lifecycle.StartStopper

// Base returns the underlying KVStore
Base() KVStore

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
SetVersion(uint64) KVStoreBasic
SetVersion(uint64) KVStore
}

// BoltDBVersioned is KvVersioned implementation based on bolt DB
Expand Down Expand Up @@ -80,50 +95,195 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error {
return b.db.Stop(ctx)
}

// Base returns the underlying KVStore
func (b *BoltDBVersioned) Base() KVStore {
return b.db

Check warning on line 100 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}

// Put writes a <key, value> record
func (b *BoltDBVersioned) Put(ns string, version uint64, key, value []byte) error {
func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) error {
if !b.db.IsReady() {
return ErrDBNotStarted
}
// TODO: implement Put
return nil
// check namespace
vn, err := b.checkNamespace(ns)
if err != nil {
return err
}
buf := batch.NewBatch()
if vn == nil {
// namespace not yet created
buf.Put(ns, _minKey, (&versionedNamespace{
keyLen: uint32(len(key)),
}).serialize(), "failed to create metadata")
} else {
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
}
// check key's metadata
km, err := b.checkKey(ns, key)
if err != nil {
return err
}
km, exit := km.updateWrite(version, value)
if exit {
return nil
}
buf.Put(ns, append(key, 0), km.serialize(), fmt.Sprintf("failed to put key %x's metadata", key))
buf.Put(ns, versionedKey(key, version), value, fmt.Sprintf("failed to put key %x", key))
return b.db.WriteBatch(buf)
}

// Get retrieves the most recent version
func (b *BoltDBVersioned) Get(ns string, version uint64, key []byte) ([]byte, error) {
func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, error) {
if !b.db.IsReady() {
return nil, ErrDBNotStarted
}
// TODO: implement Get
return nil, nil
// check key's metadata
km, err := b.checkNamespaceAndKey(ns, key)
if err != nil {
return nil, err
}
hitLast, err := km.updateRead(version)
if err != nil {
return nil, errors.Wrapf(err, "key = %x", key)
}
if hitLast {
return km.lastWrite, nil
}
return b.get(version, ns, key)
}

func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) ([]byte, error) {
// construct the actual key = key + version (represented in 8-bytes)
// and read from DB
key = versionedKey(key, version)
var value []byte
err := b.db.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
return errors.Wrapf(ErrBucketNotExist, "bucket = %x doesn't exist", []byte(ns))

Check warning on line 166 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L166

Added line #L166 was not covered by tests
}
c := bucket.Cursor()
k, v := c.Seek(key)
if k == nil || bytes.Compare(k, key) == 1 {
k, v = c.Prev()
if k == nil || bytes.Compare(k, append(key[:len(key)-8], 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
panic(fmt.Sprintf("BoltDBVersioned.get(), invalid key = %x", key))
}
}
value = make([]byte, len(v))
copy(value, v)
return nil
})
if err == nil {
return value, nil
}
if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist {
return nil, err

Check warning on line 185 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L184-L185

Added lines #L184 - L185 were not covered by tests
}
return nil, errors.Wrap(ErrIO, err.Error())

Check warning on line 187 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L187

Added line #L187 was not covered by tests
}

// Delete deletes a record,if key is nil,this will delete the whole bucket
func (b *BoltDBVersioned) Delete(ns string, key []byte) error {
// Delete deletes a record, if key does not exist, it returns nil
func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error {
if !b.db.IsReady() {
return ErrDBNotStarted
}
// TODO: implement Delete
return nil
// check key's metadata
km, err := b.checkNamespaceAndKey(ns, key)
if err != nil {
if cause := errors.Cause(err); cause != ErrNotExist && cause != ErrInvalid {
return err
}
}
if km == nil || version < km.lastVersion || version <= km.deleteVersion {
return nil
}
km.deleteVersion = version
return b.db.Put(ns, append(key, 0), km.serialize())
}

// Version returns the key's most recent version
func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
if !b.db.IsReady() {
return 0, ErrDBNotStarted
}
// TODO: implement Version
return 0, nil
// check key's metadata
km, err := b.checkNamespaceAndKey(ns, key)
if err != nil {
return 0, err
}
if km == nil {
// key not yet written
return 0, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if km.deleteVersion != 0 {
err = errors.Wrapf(ErrDeleted, "key = %x already deleted", key)

Check warning on line 224 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L224

Added line #L224 was not covered by tests
}
return km.lastVersion, err
}

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
func (b *BoltDBVersioned) SetVersion(v uint64) KVStoreBasic {
func (b *BoltDBVersioned) SetVersion(v uint64) KVStore {

Check warning on line 230 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L230

Added line #L230 was not covered by tests
return &KvWithVersion{
db: b,
version: v,
}
}

func versionedKey(key []byte, v uint64) []byte {
return append(key, byteutil.Uint64ToBytesBigEndian(v)...)
}

func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) {
data, err := b.db.Get(ns, _minKey)
switch errors.Cause(err) {
case nil:
vn, err := deserializeVersionedNamespace(data)
if err != nil {
return nil, err
}
return vn, nil
case ErrNotExist, ErrBucketNotExist:
return nil, nil
default:
return nil, err

Check warning on line 253 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L252-L253

Added lines #L252 - L253 were not covered by tests
}
}

func (b *BoltDBVersioned) checkKey(ns string, key []byte) (*keyMeta, error) {
data, err := b.db.Get(ns, append(key, 0))
switch errors.Cause(err) {
case nil:
km, err := deserializeKeyMeta(data)
if err != nil {
return nil, err
}
return km, nil
case ErrNotExist, ErrBucketNotExist:
return nil, nil
default:
return nil, err

Check warning on line 269 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L268-L269

Added lines #L268 - L269 were not covered by tests
}
}

func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) (*keyMeta, error) {
vn, err := b.checkNamespace(ns)
if err != nil {
return nil, err
}
if vn == nil {
return nil, errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key)
}
if len(key) != int(vn.keyLen) {
return nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
return b.checkKey(ns, key)
}

// KvWithVersion wraps the BoltDBVersioned with a certain version
type KvWithVersion struct {
db *BoltDBVersioned
Expand All @@ -142,15 +302,26 @@ func (b *KvWithVersion) Stop(context.Context) error {

// Put writes a <key, value> record
func (b *KvWithVersion) Put(ns string, key, value []byte) error {
return b.db.Put(ns, b.version, key, value)
return b.db.Put(b.version, ns, key, value)

Check warning on line 305 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L305

Added line #L305 was not covered by tests
}

// Get retrieves a key's value
func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) {
return b.db.Get(ns, b.version, key)
return b.db.Get(b.version, ns, key)

Check warning on line 310 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L310

Added line #L310 was not covered by tests
}

// Delete deletes a key
func (b *KvWithVersion) Delete(ns string, key []byte) error {
return b.db.Delete(ns, key)
return b.db.Delete(b.version, ns, key)

Check warning on line 315 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L315

Added line #L315 was not covered by tests
}

// Filter returns <k, v> pair in a bucket that meet the condition
func (b *KvWithVersion) Filter(ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) {
panic("Filter not supported for versioned DB")
}

// WriteBatch commits a batch
func (b *KvWithVersion) WriteBatch(kvsb batch.KVStoreBatch) error {

Check warning on line 324 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L324

Added line #L324 was not covered by tests
// TODO: implement WriteBatch
return nil

Check warning on line 326 in db/db_versioned.go

View check run for this annotation

Codecov / codecov/patch

db/db_versioned.go#L326

Added line #L326 was not covered by tests
}
Loading

0 comments on commit 628bb2d

Please sign in to comment.