diff --git a/database/etcddb/database_test.go b/database/etcddb/database_test.go index 1e601b4e4..da1a741b7 100644 --- a/database/etcddb/database_test.go +++ b/database/etcddb/database_test.go @@ -132,7 +132,8 @@ func (s *EtcDDBSuite) TestTransactionCommit(c *C) { transaction.Put(key2, value2) v, err := s.db.Get(key) c.Check(v, DeepEquals, value) - transaction.Delete(key) + err = transaction.Delete(key) + c.Assert(err, IsNil) _, err = transaction.Get(key2) c.Assert(err, IsNil) @@ -152,5 +153,5 @@ func (s *EtcDDBSuite) TestTransactionCommit(c *C) { c.Check(v2, DeepEquals, value2) _, err = transaction.Get(key) - c.Assert(err, IsNil) + c.Assert(err, NotNil) } diff --git a/database/etcddb/storage.go b/database/etcddb/storage.go index b1faa8855..64d67eb5e 100644 --- a/database/etcddb/storage.go +++ b/database/etcddb/storage.go @@ -40,6 +40,7 @@ func (s *EtcDStorage) Get(key []byte) (value []byte, err error) { } for _, kv := range getResp.Kvs { value = kv.Value + break } if len(value) == 0 { err = database.ErrNotFound @@ -169,12 +170,11 @@ func (s *EtcDStorage) CreateBatch() database.Batch { // OpenTransaction creates new transaction. func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) { - cli, err := internalOpen(s.url) + tmpdb, err := s.CreateTemporary() if err != nil { return nil, err } - kvc := clientv3.NewKV(cli) - return &transaction{t: kvc}, nil + return &transaction{s: s, tmpdb: tmpdb}, nil } // CompactDB does nothing for etcd diff --git a/database/etcddb/transaction.go b/database/etcddb/transaction.go index 4b0830563..01f54dad1 100644 --- a/database/etcddb/transaction.go +++ b/database/etcddb/transaction.go @@ -3,51 +3,71 @@ package etcddb import ( "github.com/aptly-dev/aptly/database" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/clientv3util" ) type transaction struct { - t clientv3.KV + s *EtcDStorage + tmpdb database.Storage + ops []clientv3.Op } // Get implements database.Reader interface. -func (t *transaction) Get(key []byte) ([]byte, error) { - getResp, err := t.t.Get(Ctx, string(key)) +func (t *transaction) Get(key []byte) (value []byte, err error) { + value, err = t.tmpdb.Get(key) + // if not found, search main db if err != nil { - return nil, err + value, err = t.s.Get(key) } - - var value []byte - for _, kv := range getResp.Kvs { - valc := make([]byte, len(kv.Value)) - copy(valc, kv.Value) - value = valc - } - - return value, nil + return } // Put implements database.Writer interface. func (t *transaction) Put(key, value []byte) (err error) { - _, err = t.t.Txn(Ctx). - If().Then(clientv3.OpPut(string(key), string(value))).Commit() + err = t.tmpdb.Put(key, value) + if err != nil { + return + } + t.ops = append(t.ops, clientv3.OpPut(string(key), string(value))) return } // Delete implements database.Writer interface. func (t *transaction) Delete(key []byte) (err error) { - _, err = t.t.Txn(Ctx). - If(clientv3util.KeyExists(string(key))). - Then(clientv3.OpDelete(string(key))).Commit() + err = t.tmpdb.Delete(key) + if err != nil { + return + } + t.ops = append(t.ops, clientv3.OpDelete(string(key))) return } func (t *transaction) Commit() (err error) { + kv := clientv3.NewKV(t.s.db) + + batchSize := 128 + for i := 0; i < len(t.ops); i += batchSize { + txn := kv.Txn(Ctx) + end := i + batchSize + if end > len(t.ops) { + end = len(t.ops) + } + + batch := t.ops[i:end] + txn.Then(batch...) + _, err = txn.Commit() + if err != nil { + panic(err) + } + } + t.ops = []clientv3.Op{} + return } // Discard is safe to call after Commit(), it would be no-op func (t *transaction) Discard() { + t.ops = []clientv3.Op{} + t.tmpdb.Drop() return }