-
Notifications
You must be signed in to change notification settings - Fork 325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[db] implement Get/Put/Delete/Version() for BoltDBVersioned #4256
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,22 @@ | |
package db | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"math" | ||
|
||
"github.com/pkg/errors" | ||
bolt "go.etcd.io/bbolt" | ||
|
||
"github.com/iotexproject/iotex-core/db/batch" | ||
"github.com/iotexproject/iotex-core/pkg/lifecycle" | ||
"github.com/iotexproject/iotex-core/pkg/util/byteutil" | ||
) | ||
|
||
var ( | ||
ErrDeleted = errors.New("deleted in DB") | ||
_minKey = []byte{0} // the minimum key, used to store namespace's metadata | ||
) | ||
|
||
type ( | ||
|
@@ -26,7 +39,7 @@ type ( | |
// 1. the version when the key is first created | ||
// 2. the version when the key is lastly written | ||
// 3. the version when the key is deleted | ||
// 4. hash of the key's last written value (to detect/avoid same write) | ||
// 4. the key's last written value (to fast-track read of current version) | ||
// If the location does not store a value, the key has never been written. | ||
// | ||
// How to use a versioned DB: | ||
|
@@ -43,11 +56,14 @@ type ( | |
KvVersioned interface { | ||
lifecycle.StartStopper | ||
|
||
// Base returns the underlying KVStore | ||
Base() KVStore | ||
|
||
// Version returns the key's most recent version | ||
Version(string, []byte) (uint64, error) | ||
|
||
// SetVersion sets the version, and returns a KVStore to call Put()/Get() | ||
SetVersion(uint64) KVStoreBasic | ||
SetVersion(uint64) KVStore | ||
} | ||
|
||
// BoltDBVersioned is KvVersioned implementation based on bolt DB | ||
|
@@ -80,50 +96,192 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error { | |
return b.db.Stop(ctx) | ||
} | ||
|
||
// Base returns the underlying KVStore | ||
func (b *BoltDBVersioned) Base() KVStore { | ||
return b.db | ||
} | ||
|
||
// Put writes a <key, value> record | ||
func (b *BoltDBVersioned) Put(ns string, version uint64, key, value []byte) error { | ||
func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) error { | ||
if !b.db.IsReady() { | ||
return ErrDBNotStarted | ||
} | ||
// TODO: implement Put | ||
return nil | ||
// check namespace | ||
vn, err := b.checkNamespace(ns) | ||
if err != nil { | ||
return err | ||
} | ||
buf := batch.NewBatch() | ||
if vn == nil { | ||
// namespace not yet created | ||
buf.Put(ns, _minKey, (&versionedNamespace{ | ||
keyLen: uint32(len(key)), | ||
}).serialize(), "failed to create metadata") | ||
} else { | ||
if len(key) != int(vn.keyLen) { | ||
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) | ||
} | ||
last, _, err := b.get(math.MaxUint64, ns, key) | ||
if !isNotExist(err) && version < last { | ||
// not allowed to perform write on an earlier version | ||
return ErrInvalid | ||
} | ||
buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why delete? |
||
} | ||
buf.Put(ns, keyForWrite(key, version), value, fmt.Sprintf("failed to put key %x", key)) | ||
return b.db.WriteBatch(buf) | ||
} | ||
|
||
// Get retrieves the most recent version | ||
func (b *BoltDBVersioned) Get(ns string, version uint64, key []byte) ([]byte, error) { | ||
func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, error) { | ||
if !b.db.IsReady() { | ||
return nil, ErrDBNotStarted | ||
} | ||
// TODO: implement Get | ||
return nil, nil | ||
// check key's metadata | ||
if err := b.checkNamespaceAndKey(ns, key); err != nil { | ||
return nil, err | ||
} | ||
_, v, err := b.get(version, ns, key) | ||
return v, err | ||
} | ||
|
||
// Delete deletes a record,if key is nil,this will delete the whole bucket | ||
func (b *BoltDBVersioned) Delete(ns string, key []byte) error { | ||
func (b *BoltDBVersioned) get(version uint64, ns string, key []byte) (uint64, []byte, error) { | ||
var ( | ||
last uint64 | ||
isDelete bool | ||
minimum = keyForDelete(key, 0) | ||
value []byte | ||
) | ||
key = keyForWrite(key, version) | ||
err := b.db.db.View(func(tx *bolt.Tx) error { | ||
bucket := tx.Bucket([]byte(ns)) | ||
if bucket == nil { | ||
return ErrBucketNotExist | ||
} | ||
c := bucket.Cursor() | ||
k, v := c.Seek(key) | ||
if k == nil || bytes.Compare(k, key) == 1 { | ||
k, v = c.Prev() | ||
if k == nil || bytes.Compare(k, minimum) <= 0 { | ||
// cursor is at the beginning/end of the bucket or smaller than minimum key | ||
return ErrNotExist | ||
} | ||
} | ||
isDelete, last = parseKey(k) | ||
value = make([]byte, len(v)) | ||
copy(value, v) | ||
return nil | ||
}) | ||
if err != nil { | ||
return last, nil, err | ||
} | ||
if isDelete { | ||
return last, nil, ErrDeleted | ||
} | ||
return last, value, nil | ||
} | ||
|
||
// Delete deletes a record, if key does not exist, it returns nil | ||
func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error { | ||
if !b.db.IsReady() { | ||
return ErrDBNotStarted | ||
} | ||
// TODO: implement Delete | ||
return nil | ||
// check key's metadata | ||
if err := b.checkNamespaceAndKey(ns, key); err != nil { | ||
return err | ||
} | ||
last, _, err := b.get(math.MaxUint64, ns, key) | ||
if isNotExist(err) { | ||
return err | ||
} | ||
if version < last { | ||
// not allowed to perform delete on an earlier version | ||
return ErrInvalid | ||
} | ||
buf := batch.NewBatch() | ||
buf.Put(ns, keyForDelete(key, version), nil, fmt.Sprintf("failed to delete key %x", key)) | ||
buf.Delete(ns, keyForWrite(key, version), fmt.Sprintf("failed to delete key %x", key)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why delete write key? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. discussed offline
|
||
return b.db.WriteBatch(buf) | ||
} | ||
|
||
// Version returns the key's most recent version | ||
func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { | ||
if !b.db.IsReady() { | ||
return 0, ErrDBNotStarted | ||
} | ||
// TODO: implement Version | ||
return 0, nil | ||
// check key's metadata | ||
if err := b.checkNamespaceAndKey(ns, key); err != nil { | ||
return 0, err | ||
} | ||
last, _, err := b.get(math.MaxUint64, ns, key) | ||
if isNotExist(err) { | ||
// key not yet written | ||
err = errors.Wrapf(ErrNotExist, "key = %x doesn't exist", key) | ||
} | ||
return last, err | ||
} | ||
|
||
// SetVersion sets the version, and returns a KVStore to call Put()/Get() | ||
func (b *BoltDBVersioned) SetVersion(v uint64) KVStoreBasic { | ||
func (b *BoltDBVersioned) SetVersion(v uint64) KVStore { | ||
return &KvWithVersion{ | ||
db: b, | ||
version: v, | ||
} | ||
} | ||
|
||
func isNotExist(err error) bool { | ||
return err == ErrNotExist || err == ErrBucketNotExist | ||
} | ||
|
||
func keyForWrite(key []byte, v uint64) []byte { | ||
k := make([]byte, len(key), len(key)+9) | ||
copy(k, key) | ||
k = append(k, byteutil.Uint64ToBytesBigEndian(v)...) | ||
return append(k, 1) | ||
} | ||
|
||
func keyForDelete(key []byte, v uint64) []byte { | ||
k := make([]byte, len(key), len(key)+9) | ||
copy(k, key) | ||
k = append(k, byteutil.Uint64ToBytesBigEndian(v)...) | ||
return append(k, 0) | ||
} | ||
|
||
func parseKey(key []byte) (bool, uint64) { | ||
size := len(key) | ||
return (key[size-1] == 0), byteutil.BytesToUint64BigEndian(key[size-9 : size-1]) | ||
} | ||
|
||
func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) { | ||
data, err := b.db.Get(ns, _minKey) | ||
switch errors.Cause(err) { | ||
case nil: | ||
vn, err := deserializeVersionedNamespace(data) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return vn, nil | ||
case ErrNotExist, ErrBucketNotExist: | ||
return nil, nil | ||
default: | ||
return nil, err | ||
} | ||
} | ||
|
||
func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error { | ||
vn, err := b.checkNamespace(ns) | ||
if err != nil { | ||
return err | ||
} | ||
if vn == nil { | ||
return errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns) | ||
} | ||
if len(key) != int(vn.keyLen) { | ||
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) | ||
} | ||
return nil | ||
} | ||
|
||
// KvWithVersion wraps the BoltDBVersioned with a certain version | ||
type KvWithVersion struct { | ||
db *BoltDBVersioned | ||
|
@@ -142,15 +300,26 @@ func (b *KvWithVersion) Stop(context.Context) error { | |
|
||
// Put writes a <key, value> record | ||
func (b *KvWithVersion) Put(ns string, key, value []byte) error { | ||
return b.db.Put(ns, b.version, key, value) | ||
return b.db.Put(b.version, ns, key, value) | ||
} | ||
|
||
// Get retrieves a key's value | ||
func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) { | ||
return b.db.Get(ns, b.version, key) | ||
return b.db.Get(b.version, ns, key) | ||
} | ||
|
||
// Delete deletes a key | ||
func (b *KvWithVersion) Delete(ns string, key []byte) error { | ||
return b.db.Delete(ns, key) | ||
return b.db.Delete(b.version, ns, key) | ||
} | ||
|
||
// Filter returns <k, v> pair in a bucket that meet the condition | ||
func (b *KvWithVersion) Filter(ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) { | ||
panic("Filter not supported for versioned DB") | ||
} | ||
|
||
// WriteBatch commits a batch | ||
func (b *KvWithVersion) WriteBatch(kvsb batch.KVStoreBatch) error { | ||
// TODO: implement WriteBatch | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to implement these single function
Put/Delete
? I prefer only keepWriteBatch
, b/c version update will be atomic, and also can simplify the implThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, it does not hurt to keep it