diff --git a/docs/fabric/fabricdev/core/fabricdev/vault/vault.go b/docs/fabric/fabricdev/core/fabricdev/vault/vault.go index e75f77114..71cda6d9d 100644 --- a/docs/fabric/fabricdev/core/fabricdev/vault/vault.go +++ b/docs/fabric/fabricdev/core/fabricdev/vault/vault.go @@ -34,6 +34,7 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv &populator{}, metricsProvider, tracerProvider, + &CounterBasedVersionBuilder{}, ) } @@ -45,6 +46,7 @@ func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDSt txID, &fdriver.ValidationCodeProvider{}, &marshaller{}, + &CounterBasedVersionComparator{}, ) } diff --git a/docs/fabric/fabricdev/core/fabricdev/vault/version.go b/docs/fabric/fabricdev/core/fabricdev/vault/version.go new file mode 100644 index 000000000..8db8121bf --- /dev/null +++ b/docs/fabric/fabricdev/core/fabricdev/vault/version.go @@ -0,0 +1,83 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package vault + +import ( + "bytes" + "encoding/binary" + + "github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/pkg/errors" +) + +type CounterBasedVersionBuilder struct{} + +func (c *CounterBasedVersionBuilder) VersionedValues(rws *vault.ReadWriteSet, ns driver.Namespace, writes vault.NamespaceWrites, block driver.BlockNum, indexInBloc driver.TxNum) (map[driver.PKey]vault.VersionedValue, error) { + vals := make(map[driver.PKey]vault.VersionedValue, len(writes)) + reads := rws.Reads[ns] + + for pkey, val := range writes { + v, err := version(reads, pkey) + if err != nil { + return nil, err + } + vals[pkey] = vault.VersionedValue{Raw: val, Version: v} + } + return vals, nil +} + +func version(reads vault.NamespaceReads, pkey driver.PKey) (vault.Version, error) { + // Search the corresponding read. + v, ok := reads[pkey] + if !ok { + // this is a blind write, we should check the vault. + // Let's assume here that a blind write always starts from version 0 + return Marshal(0), nil + } + + // parse the version as an integer, then increment it + counter, err := Unmarshal(v) + if err != nil { + return nil, errors.Wrapf(err, "failed unmarshalling version for %s:%v", pkey, v) + } + return Marshal(counter + 1), nil +} + +func (c *CounterBasedVersionBuilder) VersionedMetaValues(rws *vault.ReadWriteSet, ns driver.Namespace, writes vault.KeyedMetaWrites, block driver.BlockNum, indexInBloc driver.TxNum) (map[driver.PKey]driver.VersionedMetadataValue, error) { + vals := make(map[driver.PKey]driver.VersionedMetadataValue, len(writes)) + reads := rws.Reads[ns] + + for pkey, val := range writes { + v, err := version(reads, pkey) + if err != nil { + return nil, err + } + + vals[pkey] = driver.VersionedMetadataValue{Metadata: val, Version: v} + } + return vals, nil +} + +type CounterBasedVersionComparator struct{} + +func (c *CounterBasedVersionComparator) Equal(v1, v2 driver.RawVersion) bool { + return bytes.Equal(v1, v2) +} + +func Marshal(v uint32) []byte { + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf[:4], v) + return buf +} + +func Unmarshal(raw []byte) (uint32, error) { + if len(raw) != 4 { + return 0, errors.Errorf("invalid version, expected 4 bytes, got [%d]", len(raw)) + } + return binary.BigEndian.Uint32(raw), nil +} diff --git a/platform/common/core/generic/vault/helpers.go b/platform/common/core/generic/vault/helpers.go index 1a4e8f45a..a95286702 100644 --- a/platform/common/core/generic/vault/helpers.go +++ b/platform/common/core/generic/vault/helpers.go @@ -8,10 +8,11 @@ package vault import ( "context" + "encoding/binary" "testing" "github.com/hyperledger-labs/fabric-smart-client/pkg/utils/proto" - driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections" "github.com/hyperledger/fabric-protos-go/ledger/rwset" "github.com/stretchr/testify/assert" @@ -131,16 +132,16 @@ func TTestInterceptorConcurrency(t *testing.T, ddb VersionedPersistence, vp arti err = ddb.BeginUpdate() assert.NoError(t, err) - err = ddb.SetState(ns, k, VersionedValue{Raw: []byte("val"), Block: 35, TxNum: 1}) + err = ddb.SetState(ns, k, VersionedValue{Raw: []byte("val"), Version: versionBlockTxNumToBytes(35, 1)}) assert.NoError(t, err) err = ddb.Commit() assert.NoError(t, err) _, _, err = rws.GetReadAt(ns, 0) - assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 0:0, current value at version 35:1") + assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version [[]], current value at version [[0 0 0 35 0 0 0 1]]") _, err = rws.GetState(ns, k) - assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 0:0, current value at version 35:1") + assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version [[]], current value at version [[0 0 0 35 0 0 0 1]]") mv, err := rws.GetStateMetadata(ns, mk) assert.NoError(t, err) @@ -148,13 +149,13 @@ func TTestInterceptorConcurrency(t *testing.T, ddb VersionedPersistence, vp arti err = ddb.BeginUpdate() assert.NoError(t, err) - err = ddb.SetStateMetadata(ns, mk, map[string][]byte{"k": []byte("v")}, 36, 2) + err = ddb.SetStateMetadata(ns, mk, map[string][]byte{"k": []byte("v")}, versionBlockTxNumToBytes(36, 1)) assert.NoError(t, err) err = ddb.Commit() assert.NoError(t, err) _, err = rws.GetStateMetadata(ns, mk) - assert.EqualError(t, err, "invalid metadata read: previous value returned at version 0:0, current value at version 36:2") + assert.EqualError(t, err, "invalid metadata read: previous value returned at version [[]], current value at version [[0 0 0 36 0 0 0 1]]") } func TTestParallelVaults(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) { @@ -202,15 +203,15 @@ func TTestParallelVaults(t *testing.T, ddb VersionedPersistence, vp artifactsPro assert.NoError(t, err) assert.Equal(t, []byte("val_v1"), val) assert.Equal(t, map[string][]byte{"k1": []byte("mval1_v1")}, mval) - assert.Equal(t, uint64(1), txNum) - assert.Equal(t, uint64(2), blkNum) + assert.Equal(t, uint64(2), txNum) + assert.Equal(t, uint64(1), blkNum) val, mval, txNum, blkNum, err = queryVault(vault2, ns, k, mk) assert.NoError(t, err) assert.Equal(t, []byte("val_v1"), val) assert.Equal(t, map[string][]byte{"k1": []byte("mval1_v1")}, mval) - assert.Equal(t, uint64(1), txNum) - assert.Equal(t, uint64(2), blkNum) + assert.Equal(t, uint64(2), txNum) + assert.Equal(t, uint64(1), blkNum) } func TTestDeadlock(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) { @@ -243,8 +244,8 @@ func TTestDeadlock(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) assert.Equal(t, []byte("val_v1"), val) assert.Equal(t, map[string][]byte{"k1": []byte("mval1_v1")}, mval) - assert.Equal(t, uint64(1), txNum) - assert.Equal(t, uint64(2), blkNum) + assert.Equal(t, uint64(2), txNum) + assert.Equal(t, uint64(1), blkNum) } func TTestQueryExecutor(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) { @@ -255,13 +256,13 @@ func TTestQueryExecutor(t *testing.T, ddb VersionedPersistence, vp artifactsProv err = ddb.BeginUpdate() assert.NoError(t, err) - err = ddb.SetState(ns, "k2", VersionedValue{Raw: []byte("k2_value"), Block: 35, TxNum: 1}) + err = ddb.SetState(ns, "k2", VersionedValue{Raw: []byte("k2_value"), Version: versionBlockTxNumToBytes(35, 1)}) assert.NoError(t, err) - err = ddb.SetState(ns, "k3", VersionedValue{Raw: []byte("k3_value"), Block: 35, TxNum: 2}) + err = ddb.SetState(ns, "k3", VersionedValue{Raw: []byte("k3_value"), Version: versionBlockTxNumToBytes(35, 2)}) assert.NoError(t, err) - err = ddb.SetState(ns, "k1", VersionedValue{Raw: []byte("k1_value"), Block: 35, TxNum: 3}) + err = ddb.SetState(ns, "k1", VersionedValue{Raw: []byte("k1_value"), Version: versionBlockTxNumToBytes(35, 3)}) assert.NoError(t, err) - err = ddb.SetState(ns, "k111", VersionedValue{Raw: []byte("k111_value"), Block: 35, TxNum: 4}) + err = ddb.SetState(ns, "k111", VersionedValue{Raw: []byte("k111_value"), Version: versionBlockTxNumToBytes(35, 4)}) assert.NoError(t, err) err = ddb.Commit() assert.NoError(t, err) @@ -288,10 +289,10 @@ func TTestQueryExecutor(t *testing.T, ddb VersionedPersistence, vp artifactsProv } assert.Len(t, res, 4) assert.ElementsMatch(t, []VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, - {Key: "k3", Raw: []byte("k3_value"), Block: 35, TxNum: 2}, + {Key: "k1", Raw: []byte("k1_value"), Version: versionBlockTxNumToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: versionBlockTxNumToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: versionBlockTxNumToBytes(35, 1)}, + {Key: "k3", Raw: []byte("k3_value"), Version: versionBlockTxNumToBytes(35, 2)}, }, res) itr, err = ddb.GetStateRangeScanIterator(ns, "k1", "k3") @@ -305,9 +306,9 @@ func TTestQueryExecutor(t *testing.T, ddb VersionedPersistence, vp artifactsProv } assert.Len(t, res, 3) assert.Equal(t, []VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, + {Key: "k1", Raw: []byte("k1_value"), Version: versionBlockTxNumToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: versionBlockTxNumToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: versionBlockTxNumToBytes(35, 1)}, }, res) itr, err = ddb.GetStateSetIterator(ns, "k1", "k2", "k111") @@ -321,9 +322,9 @@ func TTestQueryExecutor(t *testing.T, ddb VersionedPersistence, vp artifactsProv } assert.Len(t, res, 3) assert.ElementsMatch(t, []VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, + {Key: "k1", Raw: []byte("k1_value"), Version: versionBlockTxNumToBytes(35, 3)}, + {Key: "k2", Raw: []byte("k2_value"), Version: versionBlockTxNumToBytes(35, 1)}, + {Key: "k111", Raw: []byte("k111_value"), Version: versionBlockTxNumToBytes(35, 4)}, }, res) itr, err = ddb.GetStateSetIterator(ns, "k1", "k5") @@ -336,7 +337,7 @@ func TTestQueryExecutor(t *testing.T, ddb VersionedPersistence, vp artifactsProv res = append(res, *n) } var expected = RemoveNils([]VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, + {Key: "k1", Raw: []byte("k1_value"), Version: versionBlockTxNumToBytes(35, 3)}, }) assert.Equal(t, expected, res) } @@ -349,9 +350,9 @@ func TTestShardLikeCommit(t *testing.T, ddb VersionedPersistence, vp artifactsPr // Populate the DB with some data at some height err := ddb.BeginUpdate() assert.NoError(t, err) - err = ddb.SetState(ns, k1, VersionedValue{Raw: []byte("k1val"), Block: 35, TxNum: 1}) + err = ddb.SetState(ns, k1, VersionedValue{Raw: []byte("k1val"), Version: versionBlockTxNumToBytes(35, 1)}) assert.NoError(t, err) - err = ddb.SetState(ns, k2, VersionedValue{Raw: []byte("k2val"), Block: 37, TxNum: 3}) + err = ddb.SetState(ns, k2, VersionedValue{Raw: []byte("k2val"), Version: versionBlockTxNumToBytes(37, 3)}) assert.NoError(t, err) err = ddb.Commit() assert.NoError(t, err) @@ -363,7 +364,7 @@ func TTestShardLikeCommit(t *testing.T, ddb VersionedPersistence, vp artifactsPr // create the read-write set rwsb := &ReadWriteSet{ ReadSet: ReadSet{ - Reads: map[driver2.Namespace]NamespaceReads{}, + Reads: map[driver.Namespace]NamespaceReads{}, OrderedReads: map[string][]string{}, }, WriteSet: WriteSet{ @@ -374,8 +375,8 @@ func TTestShardLikeCommit(t *testing.T, ddb VersionedPersistence, vp artifactsPr MetaWrites: map[string]KeyedMetaWrites{}, }, } - rwsb.ReadSet.Add(ns, k1, 35, 1) - rwsb.ReadSet.Add(ns, k2, 37, 2) + rwsb.ReadSet.Add(ns, k1, versionBlockTxNumToBytes(35, 1)) + rwsb.ReadSet.Add(ns, k2, versionBlockTxNumToBytes(37, 2)) rwsb.WriteSet.Add(ns, k1, []byte("k1FromTxidInvalid")) rwsb.WriteSet.Add(ns, k2, []byte("k2FromTxidInvalid")) marshaller := vp.NewMarshaller() @@ -386,7 +387,7 @@ func TTestShardLikeCommit(t *testing.T, ddb VersionedPersistence, vp artifactsPr rwset, err := aVault.GetRWSet("txid-invalid", rwsBytes) assert.NoError(t, err) err = rwset.IsValid() - assert.EqualError(t, err, "invalid read: vault at version namespace:key2 37:3, read-write set at version 37:2") + assert.EqualError(t, err, "invalid read: vault at version namespace:key2 [{[107 50 118 97 108] [0 0 0 37 0 0 0 3]}], read-write set at version [[0 0 0 37 0 0 0 2]]") // close the read-write set, even in case of error rwset.Done() @@ -409,7 +410,7 @@ func TTestShardLikeCommit(t *testing.T, ddb VersionedPersistence, vp artifactsPr // create the read-write set rwsb = &ReadWriteSet{ ReadSet: ReadSet{ - Reads: map[driver2.Namespace]NamespaceReads{}, + Reads: map[driver.Namespace]NamespaceReads{}, OrderedReads: map[string][]string{}, }, WriteSet: WriteSet{ @@ -420,8 +421,8 @@ func TTestShardLikeCommit(t *testing.T, ddb VersionedPersistence, vp artifactsPr MetaWrites: map[string]KeyedMetaWrites{}, }, } - rwsb.ReadSet.Add(ns, k1, 35, 1) - rwsb.ReadSet.Add(ns, k2, 37, 3) + rwsb.ReadSet.Add(ns, k1, versionBlockTxNumToBytes(35, 1)) + rwsb.ReadSet.Add(ns, k2, versionBlockTxNumToBytes(37, 3)) rwsb.WriteSet.Add(ns, k1, []byte("k1FromTxidValid")) rwsb.WriteSet.Add(ns, k2, []byte("k2FromTxidValid")) rwsBytes, err = marshaller.Marshal(rwsb) @@ -455,11 +456,11 @@ func TTestShardLikeCommit(t *testing.T, ddb VersionedPersistence, vp artifactsPr // check the content of the kvs after that vv, err := ddb.GetState(ns, k1) assert.NoError(t, err) - assert.Equal(t, VersionedValue{Raw: []byte("k1FromTxidValid"), Block: 38, TxNum: 10}, vv) + assert.Equal(t, VersionedValue{Raw: []byte("k1FromTxidValid"), Version: versionBlockTxNumToBytes(38, 10)}, vv) vv, err = ddb.GetState(ns, k2) assert.NoError(t, err) - assert.Equal(t, VersionedValue{Raw: []byte("k2FromTxidValid"), Block: 38, TxNum: 10}, vv) + assert.Equal(t, VersionedValue{Raw: []byte("k2FromTxidValid"), Version: versionBlockTxNumToBytes(38, 10)}, vv) // all Interceptors should be gone assert.Len(t, aVault.Interceptors, 0) @@ -475,11 +476,11 @@ func TTestVaultErr(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) rws := &ReadWriteSet{ ReadSet: ReadSet{ - Reads: map[driver2.Namespace]NamespaceReads{}, + Reads: map[driver.Namespace]NamespaceReads{}, OrderedReads: map[string][]string{}, }, } - rws.ReadSet.Add("pineapple", "key", 35, 1) + rws.ReadSet.Add("pineapple", "key", versionBlockTxNumToBytes(35, 1)) m := vp.NewMarshaller() rwsBytes, err := m.Marshal(rws) assert.NoError(t, err) @@ -526,7 +527,7 @@ func TTestMerge(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) { assert.NoError(t, err) err = ddb.BeginUpdate() assert.NoError(t, err) - err = ddb.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Block: 35, TxNum: 1}) + err = ddb.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Version: versionBlockTxNumToBytes(35, 1)}) assert.NoError(t, err) err = ddb.Commit() assert.NoError(t, err) @@ -546,7 +547,7 @@ func TTestMerge(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) { rwsb := &ReadWriteSet{ ReadSet: ReadSet{ - Reads: map[driver2.Namespace]NamespaceReads{}, + Reads: map[driver.Namespace]NamespaceReads{}, OrderedReads: map[string][]string{}, }, WriteSet: WriteSet{ @@ -557,8 +558,8 @@ func TTestMerge(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) { MetaWrites: map[string]KeyedMetaWrites{}, }, } - rwsb.ReadSet.Add(ns, k1, 35, 1) - rwsb.ReadSet.Add(ns, ne2Key, 0, 0) + rwsb.ReadSet.Add(ns, k1, versionBlockTxNumToBytes(35, 1)) + rwsb.ReadSet.Add(ns, ne2Key, nil) rwsb.WriteSet.Add(ns, k1, []byte("newv1")) rwsb.MetaWriteSet.Add(ns, k1, map[string][]byte{"k1": []byte("v1")}) m := vp.NewMarshaller() @@ -579,24 +580,24 @@ func TTestMerge(t *testing.T, ddb VersionedPersistence, vp artifactsProvider) { }}, rws.RWs().Writes) assert.Equal(t, Reads{ "namespace": { - "key1": {Block: 35, TxNum: 1}, - "notexist1": {Block: 0, TxNum: 0}, - "notexist2": {Block: 0, TxNum: 0}, + "key1": versionBlockTxNumToBytes(35, 1), + "notexist1": nil, + "notexist2": nil, }, }, rws.RWs().Reads) rwsb = &ReadWriteSet{ ReadSet: ReadSet{ - Reads: map[driver2.Namespace]NamespaceReads{}, + Reads: map[driver.Namespace]NamespaceReads{}, OrderedReads: map[string][]string{}, }, } - rwsb.ReadSet.Add(ns, k1, 36, 1) + rwsb.ReadSet.Add(ns, k1, versionBlockTxNumToBytes(36, 1)) rwsBytes, err = m.Marshal(rwsb) assert.NoError(t, err) err = rws.AppendRWSet(rwsBytes) - assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 35:1, current value at version 35:1") + assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version [[0 0 0 36 0 0 0 1]], current value at version [[0 0 0 35 0 0 0 1]]") rwsb = &ReadWriteSet{ WriteSet: WriteSet{ @@ -649,7 +650,7 @@ func TTestInspector(t *testing.T, ddb VersionedPersistence, vp artifactsProvider assert.NoError(t, err) err = ddb.BeginUpdate() assert.NoError(t, err) - err = ddb.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Block: 35, TxNum: 1}) + err = ddb.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Version: versionBlockTxNumToBytes(35, 1)}) assert.NoError(t, err) err = ddb.Commit() assert.NoError(t, err) @@ -715,18 +716,18 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) // create and populate 2 DBs err := db1.BeginUpdate() assert.NoError(t, err) - err = db1.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Block: 35, TxNum: 1}) + err = db1.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Version: versionBlockTxNumToBytes(35, 1)}) assert.NoError(t, err) - err = db1.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, 35, 1) + err = db1.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, nil) assert.NoError(t, err) err = db1.Commit() assert.NoError(t, err) err = db2.BeginUpdate() assert.NoError(t, err) - err = db2.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Block: 35, TxNum: 1}) + err = db2.SetState(ns, k1, VersionedValue{Raw: []byte("v1"), Version: versionBlockTxNumToBytes(35, 1)}) assert.NoError(t, err) - err = db2.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, 35, 1) + err = db2.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, nil) assert.NoError(t, err) err = db2.Commit() assert.NoError(t, err) @@ -747,7 +748,7 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) rws2.Done() // GET K1 - v, err := rws.GetState(ns, k1, driver2.FromIntermediate) + v, err := rws.GetState(ns, k1, driver.FromIntermediate) assert.NoError(t, err) assert.Nil(t, v) @@ -755,12 +756,12 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) assert.Equal(t, []byte("v1"), v) - v, err = rws.GetState(ns, k1, driver2.FromBoth) + v, err = rws.GetState(ns, k1, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, []byte("v1"), v) // GET K1Meta - vMap, err := rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + vMap, err := rws.GetStateMetadata(ns, k1Meta, driver.FromIntermediate) assert.NoError(t, err) assert.Nil(t, vMap) @@ -768,7 +769,7 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) @@ -777,15 +778,15 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) // GET K1 after setting it - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + v, err = rws.GetState(ns, k1, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, []byte("v1_updated"), v) - v, err = rws.GetState(ns, k1, driver2.FromStorage) + v, err = rws.GetState(ns, k1, driver.FromStorage) assert.NoError(t, err) assert.Equal(t, []byte("v1"), v) - v, err = rws.GetState(ns, k1, driver2.FromBoth) + v, err = rws.GetState(ns, k1, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, []byte("v1_updated"), v) @@ -794,7 +795,7 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) // GET K1Meta after setting it - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) @@ -802,20 +803,20 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) // GET K2 - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + v, err = rws.GetState(ns, k2, driver.FromIntermediate) assert.NoError(t, err) assert.Nil(t, v) - v, err = rws.GetState(ns, k2, driver2.FromStorage) + v, err = rws.GetState(ns, k2, driver.FromStorage) assert.NoError(t, err) assert.Nil(t, v) - v, err = rws.GetState(ns, k2, driver2.FromBoth) + v, err = rws.GetState(ns, k2, driver.FromBoth) assert.NoError(t, err) assert.Nil(t, v) @@ -824,15 +825,15 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) // GET K2 after setting it - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + v, err = rws.GetState(ns, k2, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, []byte("v2_updated"), v) - v, err = rws.GetState(ns, k2, driver2.FromStorage) + v, err = rws.GetState(ns, k2, driver.FromStorage) assert.NoError(t, err) assert.Nil(t, v) - v, err = rws.GetState(ns, k2, driver2.FromBoth) + v, err = rws.GetState(ns, k2, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, []byte("v2_updated"), v) @@ -886,33 +887,33 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.EqualError(t, err, "no write at position 2 for namespace namespace") // GET K1 - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + v, err = rws.GetState(ns, k1, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, []byte("v1_updated"), v) - v, err = rws.GetState(ns, k1, driver2.FromStorage) + v, err = rws.GetState(ns, k1, driver.FromStorage) assert.NoError(t, err) assert.Equal(t, []byte("v1"), v) - v, err = rws.GetState(ns, k1, driver2.FromBoth) + v, err = rws.GetState(ns, k1, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, []byte("v1_updated"), v) // GET K2 - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + v, err = rws.GetState(ns, k2, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, []byte("v2_updated"), v) - v, err = rws.GetState(ns, k2, driver2.FromStorage) + v, err = rws.GetState(ns, k2, driver.FromStorage) assert.NoError(t, err) assert.Nil(t, v) - v, err = rws.GetState(ns, k2, driver2.FromBoth) + v, err = rws.GetState(ns, k2, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, []byte("v2_updated"), v) // GET K1Meta - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) @@ -920,7 +921,7 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) @@ -929,15 +930,15 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) // GET K1 - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + v, err = rws.GetState(ns, k1, driver.FromIntermediate) assert.NoError(t, err) assert.Nil(t, v) - v, err = rws.GetState(ns, k1, driver2.FromStorage) + v, err = rws.GetState(ns, k1, driver.FromStorage) assert.NoError(t, err) assert.Equal(t, []byte("v1"), v) - v, err = rws.GetState(ns, k1, driver2.FromBoth) + v, err = rws.GetState(ns, k1, driver.FromBoth) assert.NoError(t, err) assert.Nil(t, v) @@ -988,33 +989,33 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.EqualError(t, err, "no write at position 2 for namespace namespace") // GET K2 - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + v, err = rws.GetState(ns, k2, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, []byte("v2_updated"), v) - v, err = rws.GetState(ns, k2, driver2.FromStorage) + v, err = rws.GetState(ns, k2, driver.FromStorage) assert.NoError(t, err) assert.Nil(t, v) - v, err = rws.GetState(ns, k2, driver2.FromBoth) + v, err = rws.GetState(ns, k2, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, []byte("v2_updated"), v) // GET K1 - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + v, err = rws.GetState(ns, k1, driver.FromIntermediate) assert.NoError(t, err) assert.Nil(t, v) - v, err = rws.GetState(ns, k1, driver2.FromStorage) + v, err = rws.GetState(ns, k1, driver.FromStorage) assert.NoError(t, err) assert.Equal(t, []byte("v1"), v) - v, err = rws.GetState(ns, k1, driver2.FromBoth) + v, err = rws.GetState(ns, k1, driver.FromBoth) assert.NoError(t, err) assert.Nil(t, v) // GET K1Meta - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver.FromIntermediate) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) @@ -1022,7 +1023,7 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver.FromBoth) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) @@ -1068,20 +1069,24 @@ func TTestRun(t *testing.T, db1, db2 VersionedPersistence, vp artifactsProvider) vv2, err := db2.GetState(ns, k1) assert.NoError(t, err) assert.Nil(t, vv1.Raw) - assert.Zero(t, vv1.Block) - assert.Zero(t, vv1.TxNum) + assert.Zero(t, vv1.Version) assert.Equal(t, vv1, vv2) vv1, err = db1.GetState(ns, k2) assert.NoError(t, err) vv2, err = db2.GetState(ns, k2) assert.NoError(t, err) - assert.Equal(t, VersionedValue{Raw: []byte("v2_updated"), Block: 35, TxNum: 2}, vv1) + assert.Equal(t, VersionedValue{Raw: []byte("v2_updated"), Version: versionBlockTxNumToBytes(35, 2)}, vv1) assert.Equal(t, vv1, vv2) - meta1, b1, t1, err := db1.GetStateMetadata(ns, k1Meta) + meta1, ver1, err := db1.GetStateMetadata(ns, k1Meta) assert.NoError(t, err) - meta2, b2, t2, err := db2.GetStateMetadata(ns, k1Meta) + versionMarshaller := BlockTxIndexVersionMarshaller{} + b1, t1, err := versionMarshaller.FromBytes(ver1) + assert.NoError(t, err) + meta2, ver2, err := db2.GetStateMetadata(ns, k1Meta) + assert.NoError(t, err) + b2, t2, err := versionMarshaller.FromBytes(ver2) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, meta1) assert.Equal(t, uint64(35), b1) @@ -1115,7 +1120,7 @@ func compare(t *testing.T, ns string, db1, db2 VersionedPersistence) { assert.Equal(t, res1, res2) } -func queryVault(v *Vault[ValidationCode], ns driver2.Namespace, key driver2.PKey, mkey driver2.MKey) (driver2.RawValue, driver2.Metadata, driver2.BlockNum, driver2.TxNum, error) { +func queryVault(v *Vault[ValidationCode], ns driver.Namespace, key driver.PKey, mkey driver.MKey) (driver.RawValue, driver.Metadata, driver.TxNum, driver.BlockNum, error) { qe, err := v.NewQueryExecutor() defer qe.Done() if err != nil { @@ -1125,7 +1130,11 @@ func queryVault(v *Vault[ValidationCode], ns driver2.Namespace, key driver2.PKey if err != nil { return nil, nil, 0, 0, err } - mval, txNum, blkNum, err := qe.GetStateMetadata(ns, mkey) + mval, kVersion, err := qe.GetStateMetadata(ns, mkey) + if err != nil { + return nil, nil, 0, 0, err + } + blkNum, txNum, err := BlockTxIndexVersionMarshaller{}.FromBytes(kVersion) if err != nil { return nil, nil, 0, 0, err } @@ -1138,19 +1147,19 @@ type deadlockErrorPersistence struct { key string } -func (db *deadlockErrorPersistence) GetState(namespace driver2.Namespace, key driver2.PKey) (VersionedValue, error) { +func (db *deadlockErrorPersistence) GetState(namespace driver.Namespace, key driver.PKey) (VersionedValue, error) { return db.VersionedPersistence.GetState(namespace, key) } -func (db *deadlockErrorPersistence) GetStateRangeScanIterator(namespace driver2.Namespace, startKey, endKey driver2.PKey) (collections.Iterator[*VersionedRead], error) { +func (db *deadlockErrorPersistence) GetStateRangeScanIterator(namespace driver.Namespace, startKey, endKey driver.PKey) (collections.Iterator[*VersionedRead], error) { return db.VersionedPersistence.GetStateRangeScanIterator(namespace, startKey, endKey) } -func (db *deadlockErrorPersistence) GetStateSetIterator(ns driver2.Namespace, keys ...driver2.PKey) (collections.Iterator[*VersionedRead], error) { +func (db *deadlockErrorPersistence) GetStateSetIterator(ns driver.Namespace, keys ...driver.PKey) (collections.Iterator[*VersionedRead], error) { return db.VersionedPersistence.GetStateSetIterator(ns, keys...) } -func (db *deadlockErrorPersistence) SetState(namespace driver2.Namespace, key driver2.PKey, value VersionedValue) error { +func (db *deadlockErrorPersistence) SetState(namespace driver.Namespace, key driver.PKey, value VersionedValue) error { if key == db.key && db.failures > 0 { db.failures-- return DeadlockDetected @@ -1158,8 +1167,8 @@ func (db *deadlockErrorPersistence) SetState(namespace driver2.Namespace, key dr return db.VersionedPersistence.SetState(namespace, key, value) } -func (db *deadlockErrorPersistence) SetStates(namespace driver2.Namespace, kvs map[driver2.PKey]VersionedValue) map[driver2.PKey]error { - errs := make(map[driver2.PKey]error) +func (db *deadlockErrorPersistence) SetStates(namespace driver.Namespace, kvs map[driver.PKey]VersionedValue) map[driver.PKey]error { + errs := make(map[driver.PKey]error) for k, v := range kvs { if err := db.SetState(namespace, k, v); err != nil { errs[k] = err @@ -1168,8 +1177,8 @@ func (db *deadlockErrorPersistence) SetStates(namespace driver2.Namespace, kvs m return errs } -func (db *deadlockErrorPersistence) DeleteStates(namespace driver2.Namespace, keys ...driver2.PKey) map[driver2.PKey]error { - errs := make(map[driver2.PKey]error) +func (db *deadlockErrorPersistence) DeleteStates(namespace driver.Namespace, keys ...driver.PKey) map[driver.PKey]error { + errs := make(map[driver.PKey]error) for _, key := range keys { if err := db.DeleteState(namespace, key); err != nil { errs[key] = err @@ -1182,20 +1191,20 @@ type duplicateErrorPersistence struct { VersionedPersistence } -func (db *duplicateErrorPersistence) SetState(driver2.Namespace, driver2.PKey, VersionedValue) error { +func (db *duplicateErrorPersistence) SetState(driver.Namespace, driver.PKey, VersionedValue) error { return UniqueKeyViolation } -func (db *duplicateErrorPersistence) SetStates(_ driver2.Namespace, kvs map[driver2.PKey]VersionedValue) map[driver2.PKey]error { - errs := make(map[driver2.PKey]error, len(kvs)) +func (db *duplicateErrorPersistence) SetStates(_ driver.Namespace, kvs map[driver.PKey]VersionedValue) map[driver.PKey]error { + errs := make(map[driver.PKey]error, len(kvs)) for k := range kvs { errs[k] = UniqueKeyViolation } return errs } -func (db *duplicateErrorPersistence) DeleteStates(namespace driver2.Namespace, keys ...driver2.PKey) map[driver2.PKey]error { - errs := make(map[driver2.PKey]error) +func (db *duplicateErrorPersistence) DeleteStates(namespace driver.Namespace, keys ...driver.PKey) map[driver.PKey]error { + errs := make(map[driver.PKey]error) for _, key := range keys { if err := db.DeleteState(namespace, key); err != nil { errs[key] = err @@ -1204,14 +1213,21 @@ func (db *duplicateErrorPersistence) DeleteStates(namespace driver2.Namespace, k return errs } -func (db *duplicateErrorPersistence) GetState(namespace driver2.Namespace, key driver2.PKey) (VersionedValue, error) { +func (db *duplicateErrorPersistence) GetState(namespace driver.Namespace, key driver.PKey) (VersionedValue, error) { return db.VersionedPersistence.GetState(namespace, key) } -func (db *duplicateErrorPersistence) GetStateRangeScanIterator(namespace driver2.Namespace, startKey, endKey driver2.PKey) (collections.Iterator[*VersionedRead], error) { +func (db *duplicateErrorPersistence) GetStateRangeScanIterator(namespace driver.Namespace, startKey, endKey driver.PKey) (collections.Iterator[*VersionedRead], error) { return db.VersionedPersistence.GetStateRangeScanIterator(namespace, startKey, endKey) } -func (db *duplicateErrorPersistence) GetStateSetIterator(ns driver2.Namespace, keys ...driver2.PKey) (collections.Iterator[*VersionedRead], error) { +func (db *duplicateErrorPersistence) GetStateSetIterator(ns driver.Namespace, keys ...driver.PKey) (collections.Iterator[*VersionedRead], error) { return db.VersionedPersistence.GetStateSetIterator(ns, keys...) } + +func versionBlockTxNumToBytes(Block driver.BlockNum, TxNum driver.TxNum) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint32(buf[:4], uint32(Block)) + binary.BigEndian.PutUint32(buf[4:], uint32(TxNum)) + return buf +} diff --git a/platform/common/core/generic/vault/interceptor.go b/platform/common/core/generic/vault/interceptor.go index d205435a9..0e1163986 100644 --- a/platform/common/core/generic/vault/interceptor.go +++ b/platform/common/core/generic/vault/interceptor.go @@ -15,20 +15,25 @@ import ( ) type VersionedQueryExecutor interface { - GetStateMetadata(namespace, key string) (map[string][]byte, uint64, uint64, error) + GetStateMetadata(namespace, key string) (driver.Metadata, driver.RawVersion, error) GetState(namespace, key string) (VersionedValue, error) Done() } +type VersionComparator interface { + Equal(v1, v2 driver.RawVersion) bool +} + type Interceptor[V driver.ValidationCode] struct { - Logger Logger - QE VersionedQueryExecutor - TxIDStore TXIDStoreReader[V] - Rws ReadWriteSet - Marshaller Marshaller - Closed bool - TxID string - vcProvider driver.ValidationCodeProvider[V] // TODO + Logger Logger + QE VersionedQueryExecutor + TxIDStore TXIDStoreReader[V] + Rws ReadWriteSet + Marshaller Marshaller + VersionComparator VersionComparator + Closed bool + TxID string + vcProvider driver.ValidationCodeProvider[V] // TODO sync.RWMutex } @@ -55,17 +60,19 @@ func NewInterceptor[V driver.ValidationCode]( txID driver.TxID, vcProvider driver.ValidationCodeProvider[V], marshaller Marshaller, + versionComparator VersionComparator, ) *Interceptor[V] { logger.Debugf("new interceptor [%s]", txID) return &Interceptor[V]{ - Logger: logger, - TxID: txID, - QE: qe, - TxIDStore: txIDStore, - Rws: EmptyRWSet(), - vcProvider: vcProvider, - Marshaller: marshaller, + Logger: logger, + TxID: txID, + QE: qe, + TxIDStore: txIDStore, + Rws: EmptyRWSet(), + vcProvider: vcProvider, + Marshaller: marshaller, + VersionComparator: versionComparator, } } @@ -89,9 +96,8 @@ func (i *Interceptor[V]) IsValid() error { if err != nil { return err } - - if vv.Block != v.Block || vv.TxNum != v.TxNum { - return errors.Errorf("invalid read: vault at version %s:%s %d:%d, read-write set at version %d:%d", ns, k, vv.Block, vv.TxNum, v.Block, v.TxNum) + if !i.VersionComparator.Equal(v, vv.Version) { + return errors.Errorf("invalid read: vault at version %s:%s [%v], read-write set at version [%v]", ns, k, vv, v) } } } @@ -236,20 +242,20 @@ func (i *Interceptor[V]) GetStateMetadata(namespace, key string, opts ...driver. i.RUnlock() return nil, errors.New("this instance is write only") } - val, block, txnum, err := i.QE.GetStateMetadata(namespace, key) + val, vaultVersion, err := i.QE.GetStateMetadata(namespace, key) if err != nil { i.RUnlock() return nil, err } i.RUnlock() - b, t, in := i.Rws.ReadSet.Get(namespace, key) + version, in := i.Rws.ReadSet.Get(namespace, key) if in { - if b != block || t != txnum { - return nil, errors.Errorf("invalid metadata read: previous value returned at version %d:%d, current value at version %d:%d", b, t, block, txnum) + if !i.VersionComparator.Equal(version, vaultVersion) { + return nil, errors.Errorf("invalid metadata read: previous value returned at version [%v], current value at version [%v]", version, vaultVersion) } } else { - i.Rws.ReadSet.Add(namespace, key, block, txnum) + i.Rws.ReadSet.Add(namespace, key, vaultVersion) } return val, nil @@ -297,14 +303,15 @@ func (i *Interceptor[V]) GetState(namespace driver.Namespace, key driver.PKey, o return nil, err } i.RUnlock() + vaultVersion := vv.Version - b, t, in := i.Rws.ReadSet.Get(namespace, key) + version, in := i.Rws.ReadSet.Get(namespace, key) if in { - if b != vv.Block || t != vv.TxNum { - return nil, errors.Errorf("invalid read [%s:%s]: previous value returned at version %d:%d, current value at version %d:%d", namespace, key, b, t, vv.Block, vv.TxNum) + if !i.VersionComparator.Equal(version, vaultVersion) { + return nil, errors.Errorf("invalid read [%s:%s]: previous value returned at version [%v], current value at version [%v]", namespace, key, version, vaultVersion) } } else { - i.Rws.ReadSet.Add(namespace, key, vv.Block, vv.TxNum) + i.Rws.ReadSet.Add(namespace, key, vaultVersion) } return vv.Raw, nil diff --git a/platform/common/core/generic/vault/interceptor_test.go b/platform/common/core/generic/vault/interceptor_test.go index 2de80bf76..4d4888f3c 100644 --- a/platform/common/core/generic/vault/interceptor_test.go +++ b/platform/common/core/generic/vault/interceptor_test.go @@ -19,9 +19,8 @@ import ( func newMockQE() mockQE { return mockQE{ State: VersionedValue{ - Raw: []byte("raw"), - Block: 1, - TxNum: 1, + Raw: []byte("raw"), + Version: blockTxIndexToBytes(1, 1), }, Metadata: map[string][]byte{ "md": []byte("meta"), @@ -34,8 +33,8 @@ type mockQE struct { Metadata map[string][]byte } -func (qe mockQE) GetStateMetadata(driver.Namespace, driver.PKey) (driver.Metadata, driver.BlockNum, driver.TxNum, error) { - return qe.Metadata, 1, 1, nil +func (qe mockQE) GetStateMetadata(driver.Namespace, driver.PKey) (driver.Metadata, driver.RawVersion, error) { + return qe.Metadata, blockTxIndexToBytes(1, 1), nil } func (qe mockQE) GetState(driver.Namespace, driver.PKey) (VersionedValue, error) { return qe.State, nil diff --git a/platform/common/core/generic/vault/queryexec.go b/platform/common/core/generic/vault/queryexec.go index 961e70fcc..9931263db 100644 --- a/platform/common/core/generic/vault/queryexec.go +++ b/platform/common/core/generic/vault/queryexec.go @@ -6,7 +6,9 @@ SPDX-License-Identifier: Apache-2.0 package vault -import "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" +import ( + "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" +) // this file contains all structs that perform DB access. They // differ in terms of the results that they return. They are both @@ -28,8 +30,12 @@ func (q *directQueryExecutor[V]) GetStateRangeScanIterator(namespace driver.Name return q.vault.store.GetStateRangeScanIterator(namespace, startKey, endKey) } -func (q *directQueryExecutor[V]) GetStateMetadata(namespace driver.Namespace, key driver.PKey) (driver.Metadata, driver.BlockNum, driver.TxNum, error) { - return q.vault.store.GetStateMetadata(namespace, key) +func (q *directQueryExecutor[V]) GetStateMetadata(namespace driver.Namespace, key driver.PKey) (driver.Metadata, driver.RawVersion, error) { + m, version, err := q.vault.store.GetStateMetadata(namespace, key) + if err != nil { + return nil, nil, err + } + return m, version, nil } func (q *directQueryExecutor[V]) Done() { @@ -46,8 +52,12 @@ func (i *interceptorQueryExecutor[V]) Done() { i.storeLock.RUnlock() } -func (i *interceptorQueryExecutor[V]) GetStateMetadata(namespace driver.Namespace, key driver.PKey) (driver.Metadata, driver.BlockNum, driver.TxNum, error) { - return i.store.GetStateMetadata(namespace, key) +func (i *interceptorQueryExecutor[V]) GetStateMetadata(namespace driver.Namespace, key driver.PKey) (driver.Metadata, driver.RawVersion, error) { + m, version, err := i.store.GetStateMetadata(namespace, key) + if err != nil { + return nil, nil, err + } + return m, version, nil } func (i *interceptorQueryExecutor[V]) GetState(namespace driver.Namespace, key driver.PKey) (VersionedValue, error) { diff --git a/platform/common/core/generic/vault/rwset.go b/platform/common/core/generic/vault/rwset.go index 255ebb4a7..e0a9ba80a 100644 --- a/platform/common/core/generic/vault/rwset.go +++ b/platform/common/core/generic/vault/rwset.go @@ -140,17 +140,12 @@ func (w *WriteSet) Clear(ns string) { w.OrderedWrites[ns] = []string{} } -type TxPosition struct { - Block driver.BlockNum - TxNum driver.TxNum -} +type Version = driver.RawVersion -type NamespaceReads map[string]TxPosition +type NamespaceReads map[string]Version func (r NamespaceReads) Equals(o NamespaceReads) error { - return entriesEqual(r, o, func(v, v2 TxPosition) bool { - return v.Block == v2.Block && v.TxNum == v2.TxNum - }) + return entriesEqual(r, o, bytes.Equal) } type Reads map[driver.Namespace]NamespaceReads @@ -164,22 +159,22 @@ type ReadSet struct { OrderedReads map[string][]string } -func (r *ReadSet) Add(ns driver.Namespace, key string, block driver.BlockNum, txnum driver.TxNum) { +func (r *ReadSet) Add(ns driver.Namespace, key string, version Version) { nsMap, in := r.Reads[ns] if !in { - nsMap = make(map[driver.Namespace]TxPosition) + nsMap = make(map[driver.Namespace]Version) r.Reads[ns] = nsMap r.OrderedReads[ns] = make([]string, 0, 8) } - nsMap[key] = TxPosition{block, txnum} + nsMap[key] = version r.OrderedReads[ns] = append(r.OrderedReads[ns], key) } -func (r *ReadSet) Get(ns driver.Namespace, key string) (driver.BlockNum, driver.TxNum, bool) { +func (r *ReadSet) Get(ns driver.Namespace, key string) (Version, bool) { entry, in := r.Reads[ns][key] - return entry.Block, entry.TxNum, in + return entry, in } func (r *ReadSet) GetAt(ns driver.Namespace, i int) (string, bool) { @@ -192,7 +187,7 @@ func (r *ReadSet) GetAt(ns driver.Namespace, i int) (string, bool) { } func (r *ReadSet) Clear(ns driver.Namespace) { - r.Reads[ns] = map[string]TxPosition{} + r.Reads[ns] = map[string]Version{} r.OrderedReads[ns] = []string{} } diff --git a/platform/common/core/generic/vault/vault.go b/platform/common/core/generic/vault/vault.go index 62e6bcff4..af97e558b 100644 --- a/platform/common/core/generic/vault/vault.go +++ b/platform/common/core/generic/vault/vault.go @@ -61,11 +61,29 @@ type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, qe Versione type ( VersionedPersistence = dbdriver.VersionedPersistence VersionedValue = dbdriver.VersionedValue + VersionedMetadataValue = dbdriver.VersionedMetadataValue VersionedRead = dbdriver.VersionedRead VersionedResultsIterator = dbdriver.VersionedResultsIterator QueryExecutor = dbdriver.QueryExecutor ) +type txCommitIndex struct { + ctx context.Context + txID driver.TxID + block driver.BlockNum + indexInBloc driver.TxNum +} + +type commitInput struct { + txCommitIndex + rws *ReadWriteSet +} + +type VersionBuilder interface { + VersionedValues(rws *ReadWriteSet, ns driver.Namespace, writes NamespaceWrites, block driver.BlockNum, indexInBloc driver.TxNum) (map[driver.PKey]VersionedValue, error) + VersionedMetaValues(rws *ReadWriteSet, ns driver.Namespace, writes KeyedMetaWrites, block driver.BlockNum, indexInBloc driver.TxNum) (map[driver.PKey]driver.VersionedMetadataValue, error) +} + var ( DeadlockDetected = dbdriver.DeadlockDetected UniqueKeyViolation = dbdriver.UniqueKeyViolation @@ -96,7 +114,8 @@ type Vault[V driver.ValidationCode] struct { populator Populator metrics *Metrics - commitBatcher runner.BatchRunner[txCommitIndex] + commitBatcher runner.BatchRunner[txCommitIndex] + versionBuilder VersionBuilder } // New returns a new instance of Vault @@ -109,6 +128,7 @@ func New[V driver.ValidationCode]( populator Populator, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider, + versionBuilder VersionBuilder, ) *Vault[V] { v := &Vault[V]{ logger: logger, @@ -119,6 +139,7 @@ func New[V driver.ValidationCode]( newInterceptor: newInterceptor, populator: populator, metrics: NewMetrics(metricsProvider, tracerProvider), + versionBuilder: versionBuilder, } v.commitBatcher = runner.NewSerialRunner(v.commitTXs) return v @@ -210,18 +231,6 @@ func (db *Vault[V]) unmapInterceptors(txIDs ...driver.TxID) (map[driver.TxID]TxI return result, nil } -type txCommitIndex struct { - ctx context.Context - txID driver.TxID - block driver.BlockNum - indexInBloc driver.TxNum -} - -type commitInput struct { - txCommitIndex - rws *ReadWriteSet -} - func (db *Vault[V]) CommitTX(ctx context.Context, txID driver.TxID, block driver.BlockNum, indexInBloc driver.TxNum) error { start := time.Now() newCtx, span := db.metrics.Vault.Start(ctx, "commit") @@ -298,7 +307,10 @@ func (db *Vault[V]) commitRWs(inputs ...commitInput) error { writes := make(map[driver.Namespace]map[driver.PKey]VersionedValue) for _, input := range inputs { for ns, ws := range input.rws.Writes { - vals := versionedValues(ws, input.block, input.indexInBloc) + vals, err := db.versionBuilder.VersionedValues(input.rws, ns, ws, input.block, input.indexInBloc) + if err != nil { + return errors.Wrapf(err, "failed to parse writes for txid %s", input.txID) + } if nsWrites, ok := writes[ns]; !ok { writes[ns] = vals } else { @@ -323,7 +335,10 @@ func (db *Vault[V]) commitRWs(inputs ...commitInput) error { metaWrites := make(map[driver.Namespace]map[driver.PKey]driver.VersionedMetadataValue) for _, input := range inputs { for ns, ws := range input.rws.MetaWrites { - vals := versionedMetaValues(ws, input.block, input.indexInBloc) + vals, err := db.versionBuilder.VersionedMetaValues(input.rws, ns, ws, input.block, input.indexInBloc) + if err != nil { + return errors.Wrapf(err, "failed to parse metadata writes for txid %s", input.txID) + } if nsWrites, ok := metaWrites[ns]; !ok { metaWrites[ns] = vals } else { @@ -396,22 +411,6 @@ func (db *Vault[V]) storeAllMetaWrites(metaWrites map[driver.Namespace]map[drive return errs } -func versionedValues(keyMap NamespaceWrites, block driver.BlockNum, indexInBloc driver.TxNum) map[driver.PKey]VersionedValue { - vals := make(map[driver.PKey]VersionedValue, len(keyMap)) - for pkey, val := range keyMap { - vals[pkey] = VersionedValue{Raw: val, Block: block, TxNum: indexInBloc} - } - return vals -} - -func versionedMetaValues(keyMap KeyedMetaWrites, block driver.BlockNum, indexInBloc driver.TxNum) map[driver.PKey]driver.VersionedMetadataValue { - vals := make(map[driver.PKey]driver.VersionedMetadataValue, len(keyMap)) - for pkey, val := range keyMap { - vals[pkey] = driver.VersionedMetadataValue{Metadata: val, Block: block, TxNum: indexInBloc} - } - return vals -} - func (db *Vault[V]) discard(ns driver.Namespace, block driver.BlockNum, indexInBloc driver.TxNum, errs map[driver.PKey]error) (bool, error) { if err1 := db.store.Discard(); err1 != nil { db.logger.Errorf("got error %v; discarding caused %s", errors2.Join(collections.Values(errs)...), err1.Error()) diff --git a/platform/common/core/generic/vault/vault_test.go b/platform/common/core/generic/vault/vault_test.go index 61edf0c69..dae3ee64f 100644 --- a/platform/common/core/generic/vault/vault_test.go +++ b/platform/common/core/generic/vault/vault_test.go @@ -43,6 +43,7 @@ func (p *testArtifactProvider) NewCachedVault(ddb VersionedPersistence) (*Vault[ &populator{}, &disabled.Provider{}, &noop.TracerProvider{}, + &BlockTxIndexVersionBuilder{}, ), nil } @@ -60,6 +61,7 @@ func (p *testArtifactProvider) NewNonCachedVault(ddb VersionedPersistence) (*Vau &populator{}, &disabled.Provider{}, &noop.TracerProvider{}, + &BlockTxIndexVersionBuilder{}, ), nil } @@ -80,6 +82,7 @@ func newInterceptor( txid, &VCProvider{}, &marshaller{}, + &BlockTxIndexVersionComparator{}, ) } @@ -112,11 +115,11 @@ func (m *marshaller) Append(destination *ReadWriteSet, raw []byte, nss ...string continue } for s, position := range reads { - b, t, in := destination.ReadSet.Get(ns, s) - if in && (b != position.Block || t != position.TxNum) { - return errors.Errorf("invalid read [%s:%s]: previous value returned at version %d:%d, current value at version %d:%d", ns, s, b, t, b, t) + v, in := destination.ReadSet.Get(ns, s) + if in && !Equal(position, v) { + return errors.Errorf("invalid read [%s:%s]: previous value returned at version [%v], current value at version [%v]", ns, s, position, v) } - destination.ReadSet.Add(ns, s, position.Block, position.TxNum) + destination.ReadSet.Add(ns, s, position) } } destination.OrderedReads = source.OrderedReads diff --git a/platform/common/core/generic/vault/version.go b/platform/common/core/generic/vault/version.go new file mode 100644 index 000000000..85eed3d57 --- /dev/null +++ b/platform/common/core/generic/vault/version.go @@ -0,0 +1,81 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package vault + +import ( + "bytes" + "encoding/binary" + + driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" + "github.com/pkg/errors" +) + +var zeroVersion = []byte{0, 0, 0, 0, 0, 0, 0, 0} + +type BlockTxIndexVersionComparator struct{} + +func (b *BlockTxIndexVersionComparator) Equal(a, c driver2.RawVersion) bool { + return Equal(a, c) +} + +type BlockTxIndexVersionBuilder struct{} + +func (b *BlockTxIndexVersionBuilder) VersionedValues(rws *ReadWriteSet, ns driver2.Namespace, writes NamespaceWrites, block driver2.BlockNum, indexInBloc driver2.TxNum) (map[driver2.PKey]VersionedValue, error) { + vals := make(map[driver2.PKey]driver.VersionedValue, len(writes)) + for pkey, val := range writes { + vals[pkey] = driver.VersionedValue{Raw: val, Version: blockTxIndexToBytes(block, indexInBloc)} + } + return vals, nil +} + +func (b *BlockTxIndexVersionBuilder) VersionedMetaValues(rws *ReadWriteSet, ns driver2.Namespace, writes KeyedMetaWrites, block driver2.BlockNum, indexInBloc driver2.TxNum) (map[driver2.PKey]driver2.VersionedMetadataValue, error) { + vals := make(map[driver2.PKey]driver2.VersionedMetadataValue, len(writes)) + for pkey, val := range writes { + vals[pkey] = driver2.VersionedMetadataValue{Metadata: val, Version: blockTxIndexToBytes(block, indexInBloc)} + } + return vals, nil +} + +type BlockTxIndexVersionMarshaller struct{} + +func (m BlockTxIndexVersionMarshaller) FromBytes(data Version) (driver2.BlockNum, driver2.TxNum, error) { + if len(data) == 0 { + return 0, 0, nil + } + if len(data) != 8 { + return 0, 0, errors.Errorf("block number must be 8 bytes, but got %d", len(data)) + } + Block := driver2.BlockNum(binary.BigEndian.Uint32(data[:4])) + TxNum := driver2.TxNum(binary.BigEndian.Uint32(data[4:])) + return Block, TxNum, nil + +} + +func (m BlockTxIndexVersionMarshaller) ToBytes(bn driver2.BlockNum, txn driver2.TxNum) Version { + return blockTxIndexToBytes(bn, txn) +} + +func blockTxIndexToBytes(Block driver2.BlockNum, TxNum driver2.TxNum) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint32(buf[:4], uint32(Block)) + binary.BigEndian.PutUint32(buf[4:], uint32(TxNum)) + return buf +} + +func Equal(a, c driver2.RawVersion) bool { + if bytes.Equal(a, c) { + return true + } + if len(a) == 0 && bytes.Equal(zeroVersion, c) { + return true + } + if len(c) == 0 && bytes.Equal(zeroVersion, a) { + return true + } + return false +} diff --git a/platform/common/driver/vault.go b/platform/common/driver/vault.go index f83775a60..bf6d0fd7c 100644 --- a/platform/common/driver/vault.go +++ b/platform/common/driver/vault.go @@ -13,22 +13,26 @@ import ( ) type ( - PKey = string - MKey = string - RawValue = []byte - Metadata = map[MKey][]byte + PKey = string + MKey = string + RawValue = []byte + Metadata = map[MKey][]byte + RawVersion = []byte ) type VersionedRead struct { - Key PKey - Raw RawValue - Block BlockNum - TxNum TxNum + Key PKey + Raw RawValue + Version RawVersion +} + +type VersionedValue struct { + Raw RawValue + Version RawVersion } type VersionedMetadataValue struct { - Block BlockNum - TxNum TxNum + Version RawVersion Metadata Metadata } @@ -36,7 +40,7 @@ type VersionedResultsIterator = collections.Iterator[*VersionedRead] type QueryExecutor interface { GetState(namespace Namespace, key PKey) (RawValue, error) - GetStateMetadata(namespace Namespace, key PKey) (Metadata, BlockNum, TxNum, error) + GetStateMetadata(namespace Namespace, key PKey) (Metadata, RawVersion, error) GetStateRangeScanIterator(namespace Namespace, startKey PKey, endKey PKey) (VersionedResultsIterator, error) Done() } diff --git a/platform/fabric/core/generic/vault/vault.go b/platform/fabric/core/generic/vault/vault.go index e4d376a3b..15e9162a7 100644 --- a/platform/fabric/core/generic/vault/vault.go +++ b/platform/fabric/core/generic/vault/vault.go @@ -48,6 +48,7 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv &populator{}, metricsProvider, tracerProvider, + &vault.BlockTxIndexVersionBuilder{}, ) } @@ -59,10 +60,13 @@ func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDSt txID, &fdriver.ValidationCodeProvider{}, &marshaller{}, + &vault.BlockTxIndexVersionComparator{}, ) } -type populator struct{} +type populator struct { + versionMarshaller vault.BlockTxIndexVersionMarshaller +} func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error { txRWSet := &rwset.TxReadWriteSet{} @@ -92,7 +96,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa bn = read.Version.BlockNum txn = read.Version.TxNum } - rws.ReadSet.Add(ns, read.Key, bn, txn) + rws.ReadSet.Add(ns, read.Key, p.versionMarshaller.ToBytes(bn, txn)) } for _, write := range nsrws.KvRwSet.Writes { @@ -116,15 +120,21 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa return nil } -type marshaller struct{} +type marshaller struct { + versionMarshaller vault.BlockTxIndexVersionMarshaller +} func (m *marshaller) Marshal(rws *vault.ReadWriteSet) ([]byte, error) { rwsb := rwsetutil.NewRWSetBuilder() for ns, keyMap := range rws.Reads { for key, v := range keyMap { - if v.Block != 0 || v.TxNum != 0 { - rwsb.AddToReadSet(ns, key, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: v.Block, TxNum: v.TxNum})) + block, txNum, err := m.versionMarshaller.FromBytes(v) + if err != nil { + return nil, errors.Wrapf(err, "failed to extract block version from bytes [%v]", v) + } + if block != 0 || txNum != 0 { + rwsb.AddToReadSet(ns, key, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: block, TxNum: txNum})) } else { rwsb.AddToReadSet(ns, key, nil) } @@ -175,13 +185,15 @@ func (m *marshaller) Append(destination *vault.ReadWriteSet, raw []byte, nss ... bnum = read.Version.BlockNum txnum = read.Version.TxNum } - - b, t, in := destination.ReadSet.Get(ns, read.Key) + dVersion, in := destination.ReadSet.Get(ns, read.Key) + b, t, err := m.versionMarshaller.FromBytes(dVersion) + if err != nil { + return errors.Wrapf(err, "failed to extract block version from bytes [%v]", dVersion) + } if in && (b != bnum || t != txnum) { - return errors.Errorf("invalid read [%s:%s]: previous value returned at version %d:%d, current value at version %d:%d", ns, read.Key, b, t, b, txnum) + return errors.Errorf("invalid read [%s:%s]: previous value returned at version [%v], current value at version [%v]", ns, read.Key, m.versionMarshaller.ToBytes(bnum, txnum), dVersion) } - - destination.ReadSet.Add(ns, read.Key, bnum, txnum) + destination.ReadSet.Add(ns, read.Key, dVersion) } for _, write := range nsrws.KvRwSet.Writes { diff --git a/platform/orion/core/generic/vault/vault.go b/platform/orion/core/generic/vault/vault.go index b8a8f1ec2..1aaf2d1c4 100644 --- a/platform/orion/core/generic/vault/vault.go +++ b/platform/orion/core/generic/vault/vault.go @@ -41,6 +41,7 @@ func New(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProvider &populator{}, metricsProvider, tracerProvider, + &vault.BlockTxIndexVersionBuilder{}, ) } @@ -61,6 +62,7 @@ func newInterceptor( txid, &odriver.ValidationCodeProvider{}, nil, + &vault.BlockTxIndexVersionComparator{}, )} } @@ -82,7 +84,9 @@ func (i *Interceptor) Equals(other interface{}, nss ...string) error { return errors.Errorf("cannot compare to the passed value [%v]", other) } -type populator struct{} +type populator struct { + versionMarshaller vault.BlockTxIndexVersionMarshaller +} func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error { txRWSet := &types.DataTx{} @@ -103,8 +107,10 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa rws.ReadSet.Add( operation.DbName, read.Key, - bn, - txn, + p.versionMarshaller.ToBytes( + bn, + txn, + ), ) } diff --git a/platform/view/services/db/dbtest/helpers.go b/platform/view/services/db/dbtest/helpers.go index 8affad0b3..5eeac450a 100644 --- a/platform/view/services/db/dbtest/helpers.go +++ b/platform/view/services/db/dbtest/helpers.go @@ -8,6 +8,7 @@ package dbtest import ( "database/sql" + "encoding/binary" "fmt" "strings" "sync" @@ -16,7 +17,9 @@ import ( "unicode/utf8" errors2 "github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors" + driver3 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" + driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/keys" "github.com/pkg/errors" @@ -107,10 +110,10 @@ func TTestRangeQueries(t *testing.T, db driver.TransactionalVersionedPersistence } assert.Len(t, res, 4) assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, - {Key: "k3", Raw: []byte("k3_value"), Block: 35, TxNum: 2}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, + {Key: "k3", Raw: []byte("k3_value"), Version: ToBytes(35, 2)}, }, res) itr, err = db.GetStateRangeScanIterator(ns, "k1", "k3") @@ -125,9 +128,9 @@ func TTestRangeQueries(t *testing.T, db driver.TransactionalVersionedPersistence res = append(res, *n) } expected := []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, } assert.Len(t, res, 3) assert.Equal(t, expected, res) @@ -144,16 +147,16 @@ func TTestRangeQueries(t *testing.T, db driver.TransactionalVersionedPersistence } itr.Close() expected = []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, } assert.Len(t, res, 2) assert.Equal(t, expected, res) expected = []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, } itr, err = db.GetStateRangeScanIterator(ns, "k1", "k3") assert.NoError(t, err) @@ -168,8 +171,8 @@ func TTestRangeQueries(t *testing.T, db driver.TransactionalVersionedPersistence assert.Equal(t, expected, res) expected = []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k3", Raw: []byte("k3_value"), Block: 35, TxNum: 2}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k3", Raw: []byte("k3_value"), Version: ToBytes(35, 2)}, } itr, err = db.GetStateSetIterator(ns, "k1", "k3") assert.NoError(t, err) @@ -181,7 +184,9 @@ func TTestRangeQueries(t *testing.T, db driver.TransactionalVersionedPersistence res = append(res, *n) } assert.Len(t, res, 2) - assert.Equal(t, expected, res) + for _, read := range expected { + assert.Contains(t, res, read) + } } func TTestMeta(t *testing.T, db driver.TransactionalVersionedPersistence) { @@ -191,7 +196,7 @@ func TTestMeta(t *testing.T, db driver.TransactionalVersionedPersistence) { err := db.BeginUpdate() assert.NoError(t, err) - err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val"), Block: 35, TxNum: 1}) + err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val"), Version: ToBytes(35, 1)}) assert.NoError(t, err) err = db.Commit() @@ -199,18 +204,20 @@ func TTestMeta(t *testing.T, db driver.TransactionalVersionedPersistence) { vv, err := db.GetState(ns, key) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Block: 35, TxNum: 1}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Version: ToBytes(35, 1)}, vv) - m, bn, tn, err := db.GetStateMetadata(ns, key) + m, ver, err := db.GetStateMetadata(ns, key) assert.NoError(t, err) assert.Len(t, m, 0) + bn, tn, err := FromBytes(ver) + assert.NoError(t, err) assert.Equal(t, uint64(35), bn) assert.Equal(t, uint64(1), tn) err = db.BeginUpdate() assert.NoError(t, err) - err = db.SetStateMetadata(ns, key, map[string][]byte{"foo": []byte("bar")}, 36, 2) + err = db.SetStateMetadata(ns, key, map[string][]byte{"foo": []byte("bar")}, ToBytes(36, 2)) assert.NoError(t, err) err = db.Commit() @@ -218,9 +225,11 @@ func TTestMeta(t *testing.T, db driver.TransactionalVersionedPersistence) { vv, err = db.GetState(ns, key) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Block: 36, TxNum: 2}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Version: ToBytes(36, 2)}, vv) - m, bn, tn, err = db.GetStateMetadata(ns, key) + m, ver, err = db.GetStateMetadata(ns, key) + assert.NoError(t, err) + bn, tn, err = FromBytes(ver) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"foo": []byte("bar")}, m) assert.Equal(t, uint64(36), bn) @@ -237,7 +246,9 @@ func TTestSimpleReadWrite(t *testing.T, db driver.TransactionalVersionedPersiste assert.Equal(t, driver.VersionedValue{}, vv) // empty metadata - m, bn, tn, err := db.GetStateMetadata(ns, key) + m, ver, err := db.GetStateMetadata(ns, key) + assert.NoError(t, err) + bn, tn, err := FromBytes(ver) assert.NoError(t, err) assert.Len(t, m, 0) assert.Equal(t, uint64(0), bn) @@ -246,7 +257,7 @@ func TTestSimpleReadWrite(t *testing.T, db driver.TransactionalVersionedPersiste // add data err = db.BeginUpdate() assert.NoError(t, err) - err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val"), Block: 35, TxNum: 1}) + err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val"), Version: ToBytes(35, 1)}) assert.NoError(t, err) err = db.Commit() assert.NoError(t, err) @@ -254,30 +265,30 @@ func TTestSimpleReadWrite(t *testing.T, db driver.TransactionalVersionedPersiste // get data vv, err = db.GetState(ns, key) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Block: 35, TxNum: 1}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Version: ToBytes(35, 1)}, vv) // logging because this can cause a deadlock if maxOpenConnections is only 1 t.Logf("get state [%s] during set state tx", key) err = db.BeginUpdate() assert.NoError(t, err) - err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val1"), Block: 36, TxNum: 2}) + err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val1"), Version: ToBytes(36, 2)}) assert.NoError(t, err) vv, err = db.GetState(ns, key) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Block: 35, TxNum: 1}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("val"), Version: ToBytes(35, 1)}, vv) err = db.Commit() assert.NoError(t, err) t.Logf("get state after tx [%s]", key) vv, err = db.GetState(ns, key) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("val1"), Block: 36, TxNum: 2}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("val1"), Version: ToBytes(36, 2)}, vv) // Discard an update err = db.BeginUpdate() assert.NoError(t, err) - err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val0"), Block: 37, TxNum: 3}) + err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("val0"), Version: ToBytes(37, 3)}) assert.NoError(t, err) err = db.Discard() assert.NoError(t, err) @@ -285,7 +296,7 @@ func TTestSimpleReadWrite(t *testing.T, db driver.TransactionalVersionedPersiste // Expect state to be same as before the rollback vv, err = db.GetState(ns, key) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("val1"), Block: 36, TxNum: 2}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("val1"), Version: ToBytes(36, 2)}, vv) // delete state err = db.BeginUpdate() @@ -305,10 +316,10 @@ func populateDB(t *testing.T, db driver.TransactionalVersionedPersistence, ns, k err := db.BeginUpdate() assert.NoError(t, err) - err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("bar"), Block: 1, TxNum: 1}) + err = db.SetState(ns, key, driver.VersionedValue{Raw: []byte("bar"), Version: ToBytes(1, 1)}) assert.NoError(t, err) - err = db.SetState(ns, keyWithSuffix, driver.VersionedValue{Raw: []byte("bar1"), Block: 1, TxNum: 1}) + err = db.SetState(ns, keyWithSuffix, driver.VersionedValue{Raw: []byte("bar1"), Version: ToBytes(1, 1)}) assert.NoError(t, err) err = db.Commit() @@ -316,11 +327,11 @@ func populateDB(t *testing.T, db driver.TransactionalVersionedPersistence, ns, k vv, err := db.GetState(ns, key) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("bar"), Block: 1, TxNum: 1}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("bar"), Version: ToBytes(1, 1)}, vv) vv, err = db.GetState(ns, keyWithSuffix) assert.NoError(t, err) - assert.Equal(t, driver.VersionedValue{Raw: []byte("bar1"), Block: 1, TxNum: 1}, vv) + assert.Equal(t, driver.VersionedValue{Raw: []byte("bar1"), Version: ToBytes(1, 1)}, vv) vv, err = db.GetState(ns, "barf") assert.NoError(t, err) @@ -335,13 +346,13 @@ func populateForRangeQueries(t *testing.T, db driver.TransactionalVersionedPersi err := db.BeginUpdate() assert.NoError(t, err) - err = db.SetState(ns, "k2", driver.VersionedValue{Raw: []byte("k2_value"), Block: 35, TxNum: 1}) + err = db.SetState(ns, "k2", driver.VersionedValue{Raw: []byte("k2_value"), Version: ToBytes(35, 1)}) assert.NoError(t, err) - err = db.SetState(ns, "k3", driver.VersionedValue{Raw: []byte("k3_value"), Block: 35, TxNum: 2}) + err = db.SetState(ns, "k3", driver.VersionedValue{Raw: []byte("k3_value"), Version: ToBytes(35, 2)}) assert.NoError(t, err) - err = db.SetState(ns, "k1", driver.VersionedValue{Raw: []byte("k1_value"), Block: 35, TxNum: 3}) + err = db.SetState(ns, "k1", driver.VersionedValue{Raw: []byte("k1_value"), Version: ToBytes(35, 3)}) assert.NoError(t, err) - err = db.SetState(ns, "k111", driver.VersionedValue{Raw: []byte("k111_value"), Block: 35, TxNum: 4}) + err = db.SetState(ns, "k111", driver.VersionedValue{Raw: []byte("k111_value"), Version: ToBytes(35, 4)}) assert.NoError(t, err) err = db.Commit() @@ -361,20 +372,24 @@ func TTestMetadata(t *testing.T, db driver.TransactionalVersionedPersistence) { ns := "namespace" key := "foo" - md, bn, txn, err := db.GetStateMetadata(ns, key) + md, ver, err := db.GetStateMetadata(ns, key) assert.NoError(t, err) assert.Nil(t, md) + bn, txn, err := FromBytes(ver) + assert.NoError(t, err) assert.Equal(t, uint64(0x0), bn) assert.Equal(t, uint64(0x0), txn) err = db.BeginUpdate() assert.NoError(t, err) - err = db.SetStateMetadata(ns, key, map[string][]byte{"foo": []byte("bar")}, 35, 1) + err = db.SetStateMetadata(ns, key, map[string][]byte{"foo": []byte("bar")}, ToBytes(35, 1)) assert.NoError(t, err) err = db.Commit() assert.NoError(t, err) - md, bn, txn, err = db.GetStateMetadata(ns, key) + md, ver, err = db.GetStateMetadata(ns, key) + assert.NoError(t, err) + bn, txn, err = FromBytes(ver) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"foo": []byte("bar")}, md) assert.Equal(t, uint64(35), bn) @@ -382,12 +397,14 @@ func TTestMetadata(t *testing.T, db driver.TransactionalVersionedPersistence) { err = db.BeginUpdate() assert.NoError(t, err) - err = db.SetStateMetadata(ns, key, map[string][]byte{"foo1": []byte("bar1")}, 36, 2) + err = db.SetStateMetadata(ns, key, map[string][]byte{"foo1": []byte("bar1")}, ToBytes(36, 2)) assert.NoError(t, err) err = db.Commit() assert.NoError(t, err) - md, bn, txn, err = db.GetStateMetadata(ns, key) + md, ver, err = db.GetStateMetadata(ns, key) + assert.NoError(t, err) + bn, txn, err = FromBytes(ver) assert.NoError(t, err) assert.Equal(t, map[string][]byte{"foo1": []byte("bar1")}, md) assert.Equal(t, uint64(36), bn) @@ -440,13 +457,13 @@ func TTestRangeQueries1(t *testing.T, db driver.TransactionalVersionedPersistenc err := db.BeginUpdate() assert.NoError(t, err) - err = db.SetState(ns, "k2", driver.VersionedValue{Raw: []byte("k2_value"), Block: 35, TxNum: 1}) + err = db.SetState(ns, "k2", driver.VersionedValue{Raw: []byte("k2_value"), Version: ToBytes(35, 1)}) assert.NoError(t, err) - err = db.SetState(ns, "k3", driver.VersionedValue{Raw: []byte("k3_value"), Block: 35, TxNum: 2}) + err = db.SetState(ns, "k3", driver.VersionedValue{Raw: []byte("k3_value"), Version: ToBytes(35, 2)}) assert.NoError(t, err) - err = db.SetState(ns, "k1", driver.VersionedValue{Raw: []byte("k1_value"), Block: 35, TxNum: 3}) + err = db.SetState(ns, "k1", driver.VersionedValue{Raw: []byte("k1_value"), Version: ToBytes(35, 3)}) assert.NoError(t, err) - err = db.SetState(ns, "k111", driver.VersionedValue{Raw: []byte("k111_value"), Block: 35, TxNum: 4}) + err = db.SetState(ns, "k111", driver.VersionedValue{Raw: []byte("k111_value"), Version: ToBytes(35, 4)}) assert.NoError(t, err) err = db.Commit() @@ -463,10 +480,10 @@ func TTestRangeQueries1(t *testing.T, db driver.TransactionalVersionedPersistenc } assert.Len(t, res, 4) assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, - {Key: "k3", Raw: []byte("k3_value"), Block: 35, TxNum: 2}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, + {Key: "k3", Raw: []byte("k3_value"), Version: ToBytes(35, 2)}, }, res) itr, err = db.GetStateRangeScanIterator(ns, "k1", "k3") @@ -480,9 +497,9 @@ func TTestRangeQueries1(t *testing.T, db driver.TransactionalVersionedPersistenc } assert.Len(t, res, 3) assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, }, res) } @@ -493,13 +510,13 @@ func TTestMultiWritesAndRangeQueries(t *testing.T, db driver.TransactionalVersio t.Fatal(err) } - err = db.SetState(ns, "k2", driver.VersionedValue{Raw: []byte("k2_value"), Block: 35, TxNum: 1}) + err = db.SetState(ns, "k2", driver.VersionedValue{Raw: []byte("k2_value"), Version: ToBytes(35, 1)}) assert.NoError(t, err) - err = db.SetState(ns, "k3", driver.VersionedValue{Raw: []byte("k3_value"), Block: 35, TxNum: 2}) + err = db.SetState(ns, "k3", driver.VersionedValue{Raw: []byte("k3_value"), Version: ToBytes(35, 2)}) assert.NoError(t, err) - err = db.SetState(ns, "k1", driver.VersionedValue{Raw: []byte("k1_value"), Block: 35, TxNum: 3}) + err = db.SetState(ns, "k1", driver.VersionedValue{Raw: []byte("k1_value"), Version: ToBytes(35, 3)}) assert.NoError(t, err) - err = db.SetState(ns, "k111", driver.VersionedValue{Raw: []byte("k111_value"), Block: 35, TxNum: 4}) + err = db.SetState(ns, "k111", driver.VersionedValue{Raw: []byte("k111_value"), Version: ToBytes(35, 4)}) assert.NoError(t, err) err = db.Commit() @@ -538,10 +555,10 @@ func TTestMultiWritesAndRangeQueries(t *testing.T, db driver.TransactionalVersio } assert.Len(t, res, 4) assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, - {Key: "k3", Raw: []byte("k3_value"), Block: 35, TxNum: 2}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, + {Key: "k3", Raw: []byte("k3_value"), Version: ToBytes(35, 2)}, }, res) itr, err = db.GetStateRangeScanIterator(ns, "k1", "k3") @@ -554,9 +571,9 @@ func TTestMultiWritesAndRangeQueries(t *testing.T, db driver.TransactionalVersio res = append(res, *n) } expected := []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, } assert.Len(t, res, 3) assert.Equal(t, expected, res) @@ -572,16 +589,16 @@ func TTestMultiWritesAndRangeQueries(t *testing.T, db driver.TransactionalVersio res = append(res, *n) } expected = []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, } assert.Len(t, res, 2) assert.Equal(t, expected, res) expected = []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, TxNum: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, TxNum: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, TxNum: 1}, + {Key: "k1", Raw: []byte("k1_value"), Version: ToBytes(35, 3)}, + {Key: "k111", Raw: []byte("k111_value"), Version: ToBytes(35, 4)}, + {Key: "k2", Raw: []byte("k2_value"), Version: ToBytes(35, 1)}, } itr, err = db.GetStateRangeScanIterator(ns, "k1", "k3") assert.NoError(t, err) @@ -621,7 +638,7 @@ func TTestMultiWrites(t *testing.T, db driver.TransactionalVersionedPersistence) func write(t *testing.T, db driver.TransactionalVersionedPersistence, ns, key string, value []byte, block, txnum uint64) { tx, err := db.NewWriteTransaction() assert.NoError(t, err) - err = tx.SetState(ns, key, driver.VersionedValue{Raw: value, Block: block, TxNum: txnum}) + err = tx.SetState(ns, key, driver.VersionedValue{Raw: value, Version: ToBytes(block, txnum)}) assert.NoError(t, err) err = tx.Commit() assert.NoError(t, err) @@ -675,7 +692,7 @@ func TTestCompositeKeys(t *testing.T, db driver.TransactionalVersionedPersistenc } { k, err := createCompositeKey(keyPrefix, comps) assert.NoError(t, err) - err = db.SetState(ns, k, driver.VersionedValue{Raw: []byte(k), Block: 35, TxNum: 1}) + err = db.SetState(ns, k, driver.VersionedValue{Raw: []byte(k), Version: ToBytes(35, 1)}) assert.NoError(t, err) } @@ -698,10 +715,10 @@ func TTestCompositeKeys(t *testing.T, db driver.TransactionalVersionedPersistenc } assert.Len(t, res, 4) assert.Equal(t, []driver.VersionedRead{ - {Key: "\x00prefix0a0b0", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30}, Block: 0x23, TxNum: 1}, - {Key: "\x00prefix0a0b010", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x31, 0x30}, Block: 0x23, TxNum: 1}, - {Key: "\x00prefix0a0b030", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x33, 0x30}, Block: 0x23, TxNum: 1}, - {Key: "\x00prefix0a0d0", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x64, 0x30}, Block: 0x23, TxNum: 1}, + {Key: "\x00prefix0a0b0", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30}, Version: ToBytes(0x23, 1)}, + {Key: "\x00prefix0a0b010", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x31, 0x30}, Version: ToBytes(0x23, 1)}, + {Key: "\x00prefix0a0b030", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x33, 0x30}, Version: ToBytes(0x23, 1)}, + {Key: "\x00prefix0a0d0", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x64, 0x30}, Version: ToBytes(0x23, 1)}, }, res) partialCompositeKey, err = createCompositeKey(keyPrefix, []string{"a", "b"}) @@ -720,9 +737,9 @@ func TTestCompositeKeys(t *testing.T, db driver.TransactionalVersionedPersistenc } assert.Len(t, res, 3) assert.Equal(t, []driver.VersionedRead{ - {Key: "\x00prefix0a0b0", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30}, Block: 0x23, TxNum: 1}, - {Key: "\x00prefix0a0b010", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x31, 0x30}, Block: 0x23, TxNum: 1}, - {Key: "\x00prefix0a0b030", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x33, 0x30}, Block: 0x23, TxNum: 1}, + {Key: "\x00prefix0a0b0", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30}, Version: ToBytes(0x23, 1)}, + {Key: "\x00prefix0a0b010", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x31, 0x30}, Version: ToBytes(0x23, 1)}, + {Key: "\x00prefix0a0b030", Raw: []uint8{0x0, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x30, 0x61, 0x30, 0x62, 0x30, 0x33, 0x30}, Version: ToBytes(0x23, 1)}, }, res) } @@ -852,10 +869,13 @@ func TTestUnversionedRange(t *testing.T, db driver.UnversionedPersistence) { res = append(res, *n) } assert.Len(t, res, 2) - assert.Equal(t, []driver.UnversionedRead{ + expected := []driver.UnversionedRead{ {Key: "k1", Raw: []byte("k1_value")}, {Key: "k2", Raw: []byte("k2_value")}, - }, res) + } + for _, read := range expected { + assert.Contains(t, res, read) + } } func TTestUnversionedSimple(t *testing.T, db driver.UnversionedPersistence) { @@ -1129,3 +1149,22 @@ func subscribe(db notifier) (chan notifyEvent, error) { } return ch, nil } + +func ToBytes(Block driver2.BlockNum, TxNum driver2.TxNum) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint32(buf[:4], uint32(Block)) + binary.BigEndian.PutUint32(buf[4:], uint32(TxNum)) + return buf +} + +func FromBytes(data driver3.RawVersion) (driver2.BlockNum, driver2.TxNum, error) { + if len(data) == 0 { + return 0, 0, nil + } + if len(data) != 8 { + return 0, 0, errors.Errorf("block number must be 8 bytes, but got %d", len(data)) + } + Block := driver2.BlockNum(binary.BigEndian.Uint32(data[:4])) + TxNum := driver2.TxNum(binary.BigEndian.Uint32(data[4:])) + return Block, TxNum, nil +} diff --git a/platform/view/services/db/driver/badger/badger.go b/platform/view/services/db/driver/badger/badger.go index e65d0eea6..8de574841 100644 --- a/platform/view/services/db/driver/badger/badger.go +++ b/platform/view/services/db/driver/badger/badger.go @@ -102,33 +102,44 @@ func (db *DB) Close() error { func (db *DB) SetState(namespace driver2.Namespace, key string, value driver.VersionedValue) error { if len(value.Raw) == 0 { - logger.Warnf("set key [%s:%d:%d] to nil value, will be deleted instead", key, value.Block, value.TxNum) + logger.Warnf("set key [%s:%v] to nil value, will be deleted instead", key, value.Version) return db.DeleteState(namespace, key) } + return db.setValues(namespace, key, value.Raw, nil, value.Version) +} + +func (db *DB) SetStateMetadata(namespace driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, version driver2.RawVersion) error { + return db.setValues(namespace, key, nil, metadata, version) +} +func (db *DB) setValues(namespace driver2.Namespace, key driver2.PKey, value driver2.RawValue, metadata driver2.Metadata, version driver2.RawVersion) error { if db.Txn == nil { panic("programming error, writing without ongoing update") } - dbKey := dbKey(namespace, key) + k := dbKey(namespace, key) - v, err := txVersionedValue(db.Txn, dbKey) + v, err := txVersionedValue(db.Txn, k) if err != nil { return err } - v.Value = value.Raw - v.Block = value.Block - v.Txnum = value.TxNum + if value != nil { + v.Value = value + } + if metadata != nil { + v.Meta = metadata + } + v.KeyVersion = version bytes, err := proto.Marshal(v) if err != nil { - return errors.Wrapf(err, "could not marshal VersionedValue for key %s", dbKey) + return errors.Wrapf(err, "could not marshal VersionedValue for key %s", k) } - err = db.Txn.Set([]byte(dbKey), bytes) + err = db.Txn.Set([]byte(k), bytes) if err != nil { - return errors.Wrapf(err, "could not set value for key %s", dbKey) + return errors.Wrapf(err, "could not set value for key %s", k) } return nil @@ -144,39 +155,10 @@ func (db *DB) SetStates(namespace driver2.Namespace, kvs map[driver2.PKey]driver return errs } -func (db *DB) SetStateMetadata(namespace driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, block driver2.BlockNum, txnum driver2.TxNum) error { - if db.Txn == nil { - panic("programming error, writing without ongoing update") - } - - dbKey := dbKey(namespace, key) - - v, err := txVersionedValue(db.Txn, dbKey) - if err != nil { - return err - } - - v.Meta = metadata - v.Block = block - v.Txnum = txnum - - bytes, err := proto.Marshal(v) - if err != nil { - return errors.Wrapf(err, "could not marshal VersionedValue for key %s", dbKey) - } - - err = db.Txn.Set([]byte(dbKey), bytes) - if err != nil { - return errors.Wrapf(err, "could not set value for key %s", dbKey) - } - - return nil -} - -func (db *DB) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.VersionedMetadataValue) map[driver2.PKey]error { +func (db *DB) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver.VersionedMetadataValue) map[driver2.PKey]error { errs := make(map[driver2.PKey]error) for pkey, value := range kvs { - if err := db.SetStateMetadata(ns, pkey, value.Metadata, value.Block, value.TxNum); err != nil { + if err := db.SetStateMetadata(ns, pkey, value.Metadata, value.Version); err != nil { errs[pkey] = err } } @@ -219,7 +201,7 @@ func (db *DB) GetState(namespace driver2.Namespace, key driver2.PKey) (driver.Ve return driver.VersionedValue{}, err } - return driver.VersionedValue{Raw: v.Value, Block: v.Block, TxNum: v.Txnum}, err + return driver.VersionedValue{Raw: v.Value, Version: v.KeyVersion}, err } func (db *DB) GetStateSetIterator(ns driver2.Namespace, keys ...driver2.PKey) (driver.VersionedResultsIterator, error) { @@ -230,16 +212,15 @@ func (db *DB) GetStateSetIterator(ns driver2.Namespace, keys ...driver2.PKey) (d return nil, err } reads[i] = &driver.VersionedRead{ - Key: key, - Raw: vv.Raw, - Block: vv.Block, - TxNum: vv.TxNum, + Key: key, + Raw: vv.Raw, + Version: vv.Version, } } return &keys2.DummyVersionedIterator{Items: reads}, nil } -func (db *DB) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.BlockNum, driver2.TxNum, error) { +func (db *DB) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.RawVersion, error) { dbKey := dbKey(namespace, key) txn := &Txn{db.db.NewTransaction(false)} @@ -247,10 +228,10 @@ func (db *DB) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (d v, err := txVersionedValue(txn, dbKey) if err != nil { - return nil, 0, 0, err + return nil, nil, err } - return v.Meta, v.Block, v.Txnum, nil + return v.Meta, v.KeyVersion, nil } func (db *DB) NewWriteTransaction() (driver.WriteTransaction, error) { @@ -292,8 +273,7 @@ func (w *WriteTransaction) SetState(namespace driver2.Namespace, key driver2.PKe } v.Value = value.Raw - v.Block = value.Block - v.Txnum = value.TxNum + v.KeyVersion = value.Version bytes, err := proto.Marshal(v) if err != nil { diff --git a/platform/view/services/db/driver/badger/badger_test.go b/platform/view/services/db/driver/badger/badger_test.go index a5e1d03c5..4757fe27c 100644 --- a/platform/view/services/db/driver/badger/badger_test.go +++ b/platform/view/services/db/driver/badger/badger_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package badger import ( + "encoding/binary" "fmt" "os" "path/filepath" @@ -17,6 +18,7 @@ import ( "github.com/dgraph-io/badger/v3" "github.com/golang/protobuf/proto" + cdriver "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/dbtest" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver/badger/mock" @@ -102,9 +104,12 @@ func TestMarshallingErrors(t *testing.T) { assert.Contains(t, err.Error(), "could not unmarshal VersionedValue for key ") assert.Equal(t, driver.VersionedValue{}, vv) - m, bn, tn, err := db.GetStateMetadata(ns, key) + m, ver, err := db.GetStateMetadata(ns, key) assert.Contains(t, err.Error(), "could not unmarshal VersionedValue for key") assert.Len(t, m, 0) + versionMarshaller := BlockTxIndexVersionMarshaller{} + bn, tn, err := versionMarshaller.FromBytes(ver) + assert.NoError(t, err) assert.Equal(t, uint64(0), bn) assert.Equal(t, uint64(0), tn) @@ -122,8 +127,10 @@ func TestMarshallingErrors(t *testing.T) { assert.EqualError(t, err, "could not get value for key ns\x00key: invalid version, expected 1, got 34") assert.Equal(t, driver.VersionedValue{}, vv) - m, bn, tn, err = db.GetStateMetadata(ns, key) + m, ver, err = db.GetStateMetadata(ns, key) assert.EqualError(t, err, "could not get value for key ns\x00key: invalid version, expected 1, got 34") + bn, tn, err = versionMarshaller.FromBytes(ver) + assert.NoError(t, err) assert.Len(t, m, 0) assert.Equal(t, uint64(0), bn) assert.Equal(t, uint64(0), tn) @@ -280,3 +287,29 @@ func BenchmarkBuilder(b *testing.B) { } result = s } + +type BlockTxIndexVersionMarshaller struct{} + +func (m BlockTxIndexVersionMarshaller) FromBytes(data cdriver.RawVersion) (cdriver.BlockNum, cdriver.TxNum, error) { + if len(data) == 0 { + return 0, 0, nil + } + if len(data) != 8 { + return 0, 0, errors.Errorf("block number must be 8 bytes, but got %d", len(data)) + } + Block := cdriver.BlockNum(binary.BigEndian.Uint32(data[:4])) + TxNum := cdriver.TxNum(binary.BigEndian.Uint32(data[4:])) + return Block, TxNum, nil + +} + +func (m BlockTxIndexVersionMarshaller) ToBytes(bn cdriver.BlockNum, txn cdriver.TxNum) cdriver.RawVersion { + return blockTxIndexToBytes(bn, txn) +} + +func blockTxIndexToBytes(block cdriver.BlockNum, txNum cdriver.TxNum) []byte { + buf := make([]byte, 16) + binary.BigEndian.PutUint64(buf[:8], block) + binary.BigEndian.PutUint64(buf[8:], txNum) + return buf +} diff --git a/platform/view/services/db/driver/badger/proto/generate.go b/platform/view/services/db/driver/badger/proto/generate.go index 86f7cd879..2ce4b465f 100644 --- a/platform/view/services/db/driver/badger/proto/generate.go +++ b/platform/view/services/db/driver/badger/proto/generate.go @@ -6,4 +6,4 @@ SPDX-License-Identifier: Apache-2.0 package proto -//go:generate protoc kvs.proto --go_out=plugins=grpc:. +//go:generate protoc kvs.proto --go_out=. diff --git a/platform/view/services/db/driver/badger/proto/kvs.pb.go b/platform/view/services/db/driver/badger/proto/kvs.pb.go index c5d95bee8..c9bcedff0 100644 --- a/platform/view/services/db/driver/badger/proto/kvs.pb.go +++ b/platform/view/services/db/driver/badger/proto/kvs.pb.go @@ -1,116 +1,179 @@ // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.28.1 // source: kvs.proto package proto import ( - fmt "fmt" - math "math" + reflect "reflect" + sync "sync" - proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) type VersionedValue struct { - Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` - Block uint64 `protobuf:"varint,2,opt,name=block,proto3" json:"block,omitempty"` - Txnum uint64 `protobuf:"varint,3,opt,name=txnum,proto3" json:"txnum,omitempty"` - Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` - Meta map[string][]byte `protobuf:"bytes,5,rep,name=meta,proto3" json:"meta,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + KeyVersion []byte `protobuf:"bytes,3,opt,name=key_version,json=keyVersion,proto3" json:"key_version,omitempty"` + Meta map[string][]byte `protobuf:"bytes,4,rep,name=meta,proto3" json:"meta,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } -func (m *VersionedValue) Reset() { *m = VersionedValue{} } -func (m *VersionedValue) String() string { return proto.CompactTextString(m) } -func (*VersionedValue) ProtoMessage() {} -func (*VersionedValue) Descriptor() ([]byte, []int) { - return fileDescriptor_7630d382ef067a3e, []int{0} +func (x *VersionedValue) Reset() { + *x = VersionedValue{} + if protoimpl.UnsafeEnabled { + mi := &file_kvs_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *VersionedValue) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_VersionedValue.Unmarshal(m, b) -} -func (m *VersionedValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_VersionedValue.Marshal(b, m, deterministic) -} -func (m *VersionedValue) XXX_Merge(src proto.Message) { - xxx_messageInfo_VersionedValue.Merge(m, src) -} -func (m *VersionedValue) XXX_Size() int { - return xxx_messageInfo_VersionedValue.Size(m) -} -func (m *VersionedValue) XXX_DiscardUnknown() { - xxx_messageInfo_VersionedValue.DiscardUnknown(m) +func (x *VersionedValue) String() string { + return protoimpl.X.MessageStringOf(x) } -var xxx_messageInfo_VersionedValue proto.InternalMessageInfo +func (*VersionedValue) ProtoMessage() {} -func (m *VersionedValue) GetVersion() uint32 { - if m != nil { - return m.Version +func (x *VersionedValue) ProtoReflect() protoreflect.Message { + mi := &file_kvs_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms } - return 0 + return mi.MessageOf(x) +} + +// Deprecated: Use VersionedValue.ProtoReflect.Descriptor instead. +func (*VersionedValue) Descriptor() ([]byte, []int) { + return file_kvs_proto_rawDescGZIP(), []int{0} } -func (m *VersionedValue) GetBlock() uint64 { - if m != nil { - return m.Block +func (x *VersionedValue) GetVersion() uint32 { + if x != nil { + return x.Version } return 0 } -func (m *VersionedValue) GetTxnum() uint64 { - if m != nil { - return m.Txnum +func (x *VersionedValue) GetValue() []byte { + if x != nil { + return x.Value } - return 0 + return nil } -func (m *VersionedValue) GetValue() []byte { - if m != nil { - return m.Value +func (x *VersionedValue) GetKeyVersion() []byte { + if x != nil { + return x.KeyVersion } return nil } -func (m *VersionedValue) GetMeta() map[string][]byte { - if m != nil { - return m.Meta +func (x *VersionedValue) GetMeta() map[string][]byte { + if x != nil { + return x.Meta } return nil } -func init() { - proto.RegisterType((*VersionedValue)(nil), "proto.VersionedValue") - proto.RegisterMapType((map[string][]byte)(nil), "proto.VersionedValue.MetaEntry") +var File_kvs_proto protoreflect.FileDescriptor + +var file_kvs_proto_rawDesc = []byte{ + 0x0a, 0x09, 0x6b, 0x76, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x22, 0xcf, 0x01, 0x0a, 0x0e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x65, 0x64, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6b, 0x65, 0x79, 0x5f, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x6b, 0x65, 0x79, 0x56, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x33, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x4d, 0x65, 0x74, 0x61, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x1a, 0x37, 0x0a, 0x09, 0x4d, + 0x65, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0d, 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, + 0x80, 0x01, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_kvs_proto_rawDescOnce sync.Once + file_kvs_proto_rawDescData = file_kvs_proto_rawDesc +) + +func file_kvs_proto_rawDescGZIP() []byte { + file_kvs_proto_rawDescOnce.Do(func() { + file_kvs_proto_rawDescData = protoimpl.X.CompressGZIP(file_kvs_proto_rawDescData) + }) + return file_kvs_proto_rawDescData } -func init() { proto.RegisterFile("kvs.proto", fileDescriptor_7630d382ef067a3e) } - -var fileDescriptor_7630d382ef067a3e = []byte{ - // 185 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcc, 0x2e, 0x2b, 0xd6, - 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xb7, 0x18, 0xb9, 0xf8, 0xc2, 0x52, - 0x8b, 0x8a, 0x33, 0xf3, 0xf3, 0x52, 0x53, 0xc2, 0x12, 0x73, 0x4a, 0x53, 0x85, 0x24, 0xb8, 0xd8, - 0xcb, 0x20, 0x22, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0xbc, 0x41, 0x30, 0xae, 0x90, 0x08, 0x17, 0x6b, - 0x52, 0x4e, 0x7e, 0x72, 0xb6, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x4b, 0x10, 0x84, 0x03, 0x12, 0x2d, - 0xa9, 0xc8, 0x2b, 0xcd, 0x95, 0x60, 0x86, 0x88, 0x82, 0x39, 0x20, 0xd1, 0x32, 0x90, 0x71, 0x12, - 0x2c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x10, 0x8e, 0x90, 0x31, 0x17, 0x4b, 0x6e, 0x6a, 0x49, 0xa2, - 0x04, 0xab, 0x02, 0xb3, 0x06, 0xb7, 0x91, 0x3c, 0xc4, 0x2d, 0x7a, 0xa8, 0x0e, 0xd0, 0xf3, 0x4d, - 0x2d, 0x49, 0x74, 0xcd, 0x2b, 0x29, 0xaa, 0x0c, 0x02, 0x2b, 0x96, 0x32, 0xe7, 0xe2, 0x84, 0x0b, - 0x09, 0x09, 0x70, 0x31, 0x67, 0xa7, 0x56, 0x82, 0x5d, 0xc6, 0x19, 0x04, 0x62, 0x22, 0x6c, 0x62, - 0x42, 0xb2, 0xc9, 0x8a, 0xc9, 0x82, 0x31, 0x89, 0x0d, 0x6c, 0xbc, 0x31, 0x20, 0x00, 0x00, 0xff, - 0xff, 0xf8, 0x7e, 0x02, 0xba, 0xf7, 0x00, 0x00, 0x00, +var file_kvs_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_kvs_proto_goTypes = []interface{}{ + (*VersionedValue)(nil), // 0: proto.VersionedValue + nil, // 1: proto.VersionedValue.MetaEntry +} +var file_kvs_proto_depIdxs = []int32{ + 1, // 0: proto.VersionedValue.meta:type_name -> proto.VersionedValue.MetaEntry + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_kvs_proto_init() } +func file_kvs_proto_init() { + if File_kvs_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_kvs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*VersionedValue); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_kvs_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_kvs_proto_goTypes, + DependencyIndexes: file_kvs_proto_depIdxs, + MessageInfos: file_kvs_proto_msgTypes, + }.Build() + File_kvs_proto = out.File + file_kvs_proto_rawDesc = nil + file_kvs_proto_goTypes = nil + file_kvs_proto_depIdxs = nil } diff --git a/platform/view/services/db/driver/badger/proto/kvs.proto b/platform/view/services/db/driver/badger/proto/kvs.proto index f5c4842cb..e01bcd50a 100644 --- a/platform/view/services/db/driver/badger/proto/kvs.proto +++ b/platform/view/services/db/driver/badger/proto/kvs.proto @@ -1,11 +1,13 @@ syntax = "proto3"; +option go_package = "./proto"; +option cc_generic_services = true; + package proto; message VersionedValue { uint32 version = 1; - uint64 block = 2; - uint64 txnum = 3; - bytes value = 4; - map meta = 5; + bytes value = 2; + bytes key_version = 3; + map meta = 4; } diff --git a/platform/view/services/db/driver/badger/range.go b/platform/view/services/db/driver/badger/range.go index 3beadb917..ce8070806 100644 --- a/platform/view/services/db/driver/badger/range.go +++ b/platform/view/services/db/driver/badger/range.go @@ -52,10 +52,9 @@ func (r *rangeScanIterator) Next() (*driver.VersionedRead, error) { r.it.Next() return &driver.VersionedRead{ - Key: dbKey, - Block: v.Block, - TxNum: v.Txnum, - Raw: v.Value, + Key: dbKey, + Version: v.KeyVersion, + Raw: v.Value, }, nil } diff --git a/platform/view/services/db/driver/driver.go b/platform/view/services/db/driver/driver.go index 90ca8e129..0a6c9ba7d 100644 --- a/platform/view/services/db/driver/driver.go +++ b/platform/view/services/db/driver/driver.go @@ -21,11 +21,7 @@ var ( type SQLError = error -type VersionedValue struct { - Raw driver.RawValue - Block driver.BlockNum - TxNum driver.TxNum -} +type VersionedValue = driver.VersionedValue type VersionedMetadataValue = driver.VersionedMetadataValue @@ -88,9 +84,9 @@ type UnversionedPersistence interface { type VersionedPersistence interface { BasePersistence[VersionedValue, VersionedRead] // GetStateMetadata gets the metadata and version for given namespace and key - GetStateMetadata(namespace driver.Namespace, key driver.PKey) (driver.Metadata, driver.BlockNum, driver.TxNum, error) + GetStateMetadata(namespace driver.Namespace, key driver.PKey) (driver.Metadata, driver.RawVersion, error) // SetStateMetadata sets the given metadata for the given namespace, key, and version - SetStateMetadata(namespace driver.Namespace, key driver.PKey, metadata driver.Metadata, block driver.BlockNum, txnum driver.TxNum) error + SetStateMetadata(namespace driver.Namespace, key driver.PKey, metadata driver.Metadata, version driver.RawVersion) error // SetStateMetadatas sets the given metadata for the given namespace, keys, and version SetStateMetadatas(ns driver.Namespace, kvs map[driver.PKey]driver.VersionedMetadataValue) map[driver.PKey]error } diff --git a/platform/view/services/db/driver/notifier/persistence.go b/platform/view/services/db/driver/notifier/persistence.go index 41dc3d82e..9224acf7e 100644 --- a/platform/view/services/db/driver/notifier/persistence.go +++ b/platform/view/services/db/driver/notifier/persistence.go @@ -191,12 +191,12 @@ func (db *VersionedPersistenceNotifier[P]) GetState(namespace driver2.Namespace, return db.Persistence.GetState(namespace, key) } -func (db *VersionedPersistenceNotifier[P]) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.BlockNum, driver2.TxNum, error) { +func (db *VersionedPersistenceNotifier[P]) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.RawVersion, error) { return db.Persistence.GetStateMetadata(namespace, key) } -func (db *VersionedPersistenceNotifier[P]) SetStateMetadata(namespace driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, block driver2.BlockNum, txnum driver2.TxNum) error { - return db.Persistence.SetStateMetadata(namespace, key, metadata, block, txnum) +func (db *VersionedPersistenceNotifier[P]) SetStateMetadata(namespace driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, version driver2.RawVersion) error { + return db.Persistence.SetStateMetadata(namespace, key, metadata, nil) } func (db *VersionedPersistenceNotifier[P]) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.VersionedMetadataValue) map[driver2.PKey]error { diff --git a/platform/view/services/db/driver/sql/common/base.go b/platform/view/services/db/driver/sql/common/base.go index 054bf5f6f..e59005db5 100644 --- a/platform/view/services/db/driver/sql/common/base.go +++ b/platform/view/services/db/driver/sql/common/base.go @@ -188,19 +188,20 @@ func (db *BasePersistence[V, R]) hasKeys(ns driver2.Namespace, pkeys []driver2.P } func (db *BasePersistence[V, R]) SetStateWithTx(tx *sql.Tx, ns driver2.Namespace, pkey string, value V) error { + if tx == nil { + panic("programming error, writing without ongoing update") + } + keys := db.ValueScanner.Columns() values := db.ValueScanner.WriteValue(value) // Get rawVal valIndex := slices.Index(keys, "val") val := values[valIndex].([]byte) - if len(val) == 0 { logger.Warnf("set key [%s:%s] to nil value, will be deleted instead", ns, pkey) return db.DeleteState(ns, pkey) } - if tx == nil { - panic("programming error, writing without ongoing update") - } + logger.Debugf("set state [%s,%s]", ns, pkey) // Overwrite rawVal diff --git a/platform/view/services/db/driver/sql/common/versioned.go b/platform/view/services/db/driver/sql/common/versioned.go index 619d72b12..12372d55d 100644 --- a/platform/view/services/db/driver/sql/common/versioned.go +++ b/platform/view/services/db/driver/sql/common/versioned.go @@ -11,12 +11,10 @@ import ( "database/sql" "encoding/gob" "fmt" - "strings" "github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors" driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" - errors2 "github.com/pkg/errors" ) func NewVersionedReadScanner() *versionedReadScanner { return &versionedReadScanner{} } @@ -29,38 +27,38 @@ func NewVersionedMetadataValueScanner() *versionedMetadataValueScanner { type versionedReadScanner struct{} -func (s *versionedReadScanner) Columns() []string { return []string{"pkey", "block", "txnum", "val"} } +func (s *versionedReadScanner) Columns() []string { return []string{"pkey", "kversion", "val"} } func (s *versionedReadScanner) ReadValue(txs scannable) (driver.VersionedRead, error) { var r driver.VersionedRead - err := txs.Scan(&r.Key, &r.Block, &r.TxNum, &r.Raw) + err := txs.Scan(&r.Key, &r.Version, &r.Raw) return r, err } type versionedValueScanner struct{} -func (s *versionedValueScanner) Columns() []string { return []string{"val", "block", "txnum"} } +func (s *versionedValueScanner) Columns() []string { return []string{"val", "kversion"} } func (s *versionedValueScanner) ReadValue(txs scannable) (driver.VersionedValue, error) { var r driver.VersionedValue - err := txs.Scan(&r.Raw, &r.Block, &r.TxNum) + err := txs.Scan(&r.Raw, &r.Version) return r, err } func (s *versionedValueScanner) WriteValue(value driver.VersionedValue) []any { - return []any{value.Raw, value.Block, value.TxNum} + return []any{value.Raw, value.Version} } type versionedMetadataValueScanner struct{} func (s *versionedMetadataValueScanner) Columns() []string { - return []string{"metadata", "block", "txnum"} + return []string{"metadata", "kversion"} } func (s *versionedMetadataValueScanner) ReadValue(txs scannable) (driver2.VersionedMetadataValue, error) { var r driver2.VersionedMetadataValue var metadata []byte - if err := txs.Scan(&metadata, &r.Block, &r.TxNum); err != nil { + if err := txs.Scan(&metadata, &r.Version); err != nil { return r, err } else if meta, err := unmarshalMetadata(metadata); err != nil { return r, fmt.Errorf("error decoding metadata: %w", err) @@ -75,7 +73,7 @@ func (s *versionedMetadataValueScanner) WriteValue(value driver2.VersionedMetada if err != nil { return nil, err } - return []any{metadata, value.Block, value.TxNum}, nil + return []any{metadata, value.Version}, nil } type basePersistence[V any, R any] interface { @@ -106,11 +104,14 @@ func NewVersioned(readDB *sql.DB, writeDB *sql.DB, table string, errorWrapper dr return NewVersionedPersistence(base, base.table, base.errorWrapper, base.readDB, base.writeDB) } -func (db *VersionedPersistence) SetStateMetadata(ns driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, block driver2.BlockNum, txnum driver2.TxNum) error { +func (db *VersionedPersistence) SetStateMetadata(ns driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, version driver2.RawVersion) error { if len(metadata) == 0 { return nil } - return db.SetStateMetadatas(ns, map[driver2.PKey]driver2.VersionedMetadataValue{key: {Block: block, TxNum: txnum, Metadata: metadata}})[key] + return db.SetStateMetadatas( + ns, + map[driver2.PKey]driver2.VersionedMetadataValue{key: {Version: version, Metadata: metadata}}, + )[key] } func (db *VersionedPersistence) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.VersionedMetadataValue) map[driver2.PKey]error { @@ -129,21 +130,28 @@ func (db *VersionedPersistence) SetStateMetadatas(ns driver2.Namespace, kvs map[ return db.UpsertStates(ns, db.metadataScanner.Columns(), vals) } -// TODO: AF Reuse code from basePersistence -func (db *VersionedPersistence) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.BlockNum, driver2.TxNum, error) { - where, args := Where(db.hasKey(namespace, key)) - query := fmt.Sprintf("SELECT %s FROM %s %s", strings.Join(db.metadataScanner.Columns(), ", "), db.table, where) - logger.Debug(query, args) +func (db *VersionedPersistence) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.RawVersion, error) { + var m []byte + var meta driver2.Metadata + var kversion driver2.RawVersion - row := db.readDB.QueryRow(query, args...) - if value, err := db.metadataScanner.ReadValue(row); err == nil { - return value.Metadata, value.Block, value.TxNum, nil - } else if err == sql.ErrNoRows { - logger.Debugf("not found: [%s:%s]", namespace, key) - return nil, 0, 0, nil - } else { - return nil, 0, 0, errors2.Wrapf(err, "error querying db: %s", query) + query := fmt.Sprintf("SELECT metadata, kversion FROM %s WHERE ns = $1 AND pkey = $2", db.table) + logger.Debug(query, namespace, key) + + row := db.readDB.QueryRow(query, namespace, key) + if err := row.Scan(&m, &kversion); err != nil { + if err == sql.ErrNoRows { + logger.Debugf("not found: [%s:%s]", namespace, key) + return meta, nil, nil + } + return meta, nil, fmt.Errorf("error querying db: %w", err) } + meta, err := unmarshalMetadata(m) + if err != nil { + return meta, nil, fmt.Errorf("error decoding metadata: %w", err) + } + + return meta, kversion, err } func (db *VersionedPersistence) CreateSchema() error { @@ -151,9 +159,8 @@ func (db *VersionedPersistence) CreateSchema() error { CREATE TABLE IF NOT EXISTS %s ( ns TEXT NOT NULL, pkey BYTEA NOT NULL, - block BIGINT NOT NULL DEFAULT 0, - txnum BIGINT NOT NULL DEFAULT 0, val BYTEA NOT NULL DEFAULT '', + kversion BYTEA DEFAULT '', metadata BYTEA NOT NULL DEFAULT '', version INT NOT NULL DEFAULT 0, PRIMARY KEY (pkey, ns) diff --git a/platform/view/services/db/driver/sql/postgres/versioned.go b/platform/view/services/db/driver/sql/postgres/versioned.go index 493cd135e..e48464fed 100644 --- a/platform/view/services/db/driver/sql/postgres/versioned.go +++ b/platform/view/services/db/driver/sql/postgres/versioned.go @@ -75,12 +75,12 @@ func (db *VersionedPersistence) Discard() error { return db.p.Discard() } -func (db *VersionedPersistence) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.BlockNum, driver2.TxNum, error) { +func (db *VersionedPersistence) GetStateMetadata(namespace driver2.Namespace, key driver2.PKey) (driver2.Metadata, driver2.RawVersion, error) { return db.p.GetStateMetadata(namespace, key) } -func (db *VersionedPersistence) SetStateMetadata(namespace driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, block driver2.BlockNum, txnum driver2.TxNum) error { - return db.p.SetStateMetadata(namespace, key, metadata, block, txnum) +func (db *VersionedPersistence) SetStateMetadata(namespace driver2.Namespace, key driver2.PKey, metadata driver2.Metadata, version driver2.RawVersion) error { + return db.p.SetStateMetadata(namespace, key, metadata, version) } func (db *VersionedPersistence) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.VersionedMetadataValue) map[driver2.PKey]error {