Skip to content

Commit

Permalink
etcd: implement transactions
Browse files Browse the repository at this point in the history
- use temporary db for lookups in transactions
- use batch implementation to commit transaction
  • Loading branch information
neolynx committed Jul 31, 2024
1 parent 7a01c9c commit 6777179
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
5 changes: 3 additions & 2 deletions database/etcddb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (s *EtcDDBSuite) TestTransactionCommit(c *C) {
transaction.Put(key2, value2)
v, err := s.db.Get(key)
c.Check(v, DeepEquals, value)
transaction.Delete(key)
err = transaction.Delete(key)
c.Assert(err, IsNil)

_, err = transaction.Get(key2)
c.Assert(err, IsNil)
Expand All @@ -152,5 +153,5 @@ func (s *EtcDDBSuite) TestTransactionCommit(c *C) {
c.Check(v2, DeepEquals, value2)

_, err = transaction.Get(key)
c.Assert(err, IsNil)
c.Assert(err, NotNil)
}
6 changes: 3 additions & 3 deletions database/etcddb/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *EtcDStorage) Get(key []byte) (value []byte, err error) {
}
for _, kv := range getResp.Kvs {
value = kv.Value
break
}
if len(value) == 0 {
err = database.ErrNotFound
Expand Down Expand Up @@ -169,12 +170,11 @@ func (s *EtcDStorage) CreateBatch() database.Batch {

// OpenTransaction creates new transaction.
func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) {
cli, err := internalOpen(s.url)
tmpdb, err := s.CreateTemporary()
if err != nil {
return nil, err
}
kvc := clientv3.NewKV(cli)
return &transaction{t: kvc}, nil
return &transaction{s: s, tmpdb: tmpdb}, nil
}

// CompactDB does nothing for etcd
Expand Down
58 changes: 39 additions & 19 deletions database/etcddb/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,71 @@ package etcddb
import (
"github.com/aptly-dev/aptly/database"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
)

type transaction struct {
t clientv3.KV
s *EtcDStorage
tmpdb database.Storage
ops []clientv3.Op
}

// Get implements database.Reader interface.
func (t *transaction) Get(key []byte) ([]byte, error) {
getResp, err := t.t.Get(Ctx, string(key))
func (t *transaction) Get(key []byte) (value []byte, err error) {
value, err = t.tmpdb.Get(key)
// if not found, search main db
if err != nil {
return nil, err
value, err = t.s.Get(key)
}

var value []byte
for _, kv := range getResp.Kvs {
valc := make([]byte, len(kv.Value))
copy(valc, kv.Value)
value = valc
}

return value, nil
return
}

// Put implements database.Writer interface.
func (t *transaction) Put(key, value []byte) (err error) {
_, err = t.t.Txn(Ctx).
If().Then(clientv3.OpPut(string(key), string(value))).Commit()
err = t.tmpdb.Put(key, value)
if err != nil {
return
}
t.ops = append(t.ops, clientv3.OpPut(string(key), string(value)))
return
}

// Delete implements database.Writer interface.
func (t *transaction) Delete(key []byte) (err error) {
_, err = t.t.Txn(Ctx).
If(clientv3util.KeyExists(string(key))).
Then(clientv3.OpDelete(string(key))).Commit()
err = t.tmpdb.Delete(key)
if err != nil {
return
}
t.ops = append(t.ops, clientv3.OpDelete(string(key)))
return
}

func (t *transaction) Commit() (err error) {
kv := clientv3.NewKV(t.s.db)

batchSize := 128
for i := 0; i < len(t.ops); i += batchSize {
txn := kv.Txn(Ctx)
end := i + batchSize
if end > len(t.ops) {
end = len(t.ops)
}

batch := t.ops[i:end]
txn.Then(batch...)
_, err = txn.Commit()
if err != nil {
panic(err)
}
}
t.ops = []clientv3.Op{}

return
}

// Discard is safe to call after Commit(), it would be no-op
func (t *transaction) Discard() {
t.ops = []clientv3.Op{}
t.tmpdb.Drop()
return
}

Expand Down

0 comments on commit 6777179

Please sign in to comment.