Skip to content

Commit

Permalink
feat: Add unpublished operation store option
Browse files Browse the repository at this point in the history
Add unpublished operation store option to the following components:
1. resolver (add unpublished operation to the unpublished operation store during ingestion)
2. operation processor (add operation from unpublished store to the operations from published store and assemble document)
3. txn processor (will delete all unpublished operations once they are published to operation store)

Closes #610

Signed-off-by: Sandra Vrtikapa <sandra.vrtikapa@securekey.com>
  • Loading branch information
sandrask committed Sep 10, 2021
1 parent bd8431c commit 4baed6e
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 19 deletions.
96 changes: 85 additions & 11 deletions pkg/dochandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/trustbloc/edge-core/pkg/log"

Expand Down Expand Up @@ -49,6 +50,16 @@ type DocumentHandler struct {
aliases []string // namespace aliases
domain string
label string

unpublishedOperationStore unpublishedOperationStore
unpublishedOperationTypes []operation.Type
}

type unpublishedOperationStore interface {
// Put saves operation into unpublished operation store.
Put(op *operation.AnchoredOperation) error
// Delete deletes operation from unpublished operation store.
Delete(suffix string) error
}

// operationProcessor is an interface which resolves the document based on the ID.
Expand Down Expand Up @@ -78,14 +89,24 @@ func WithLabel(label string) Option {
}
}

// WithUnpublishedOperationStore stores unpublished operation into unpublished operation store.
func WithUnpublishedOperationStore(store unpublishedOperationStore, operationTypes []operation.Type) Option {
return func(opts *DocumentHandler) {
opts.unpublishedOperationStore = store
opts.unpublishedOperationTypes = operationTypes
}
}

// New creates a new document handler with the context.
func New(namespace string, aliases []string, pc protocol.Client, writer batchWriter, processor operationProcessor, opts ...Option) *DocumentHandler {
dh := &DocumentHandler{
protocol: pc,
processor: processor,
writer: writer,
namespace: namespace,
aliases: aliases,
protocol: pc,
processor: processor,
writer: writer,
namespace: namespace,
aliases: aliases,
unpublishedOperationStore: &noopUnpublishedOpsStore{},
unpublishedOperationTypes: []operation.Type{},
}

// apply options
Expand All @@ -102,7 +123,7 @@ func (r *DocumentHandler) Namespace() string {
}

// ProcessOperation validates operation and adds it to the batch.
func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenesisTime uint64) (*document.ResolutionResult, error) {
func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenesisTime uint64) (*document.ResolutionResult, error) { //nolint:gocyclo
pv, err := r.protocol.Get(protocolGenesisTime)
if err != nil {
return nil, err
Expand All @@ -114,29 +135,37 @@ func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenes
}

// perform validation for operation request
if err := r.validateOperation(op, pv); err != nil {
err = r.validateOperation(op, pv)
if err != nil {
logger.Warnf("Failed to validate operation: %s", err.Error())

return nil, err
}

if op.Type != operation.TypeCreate {
internalResult, err := r.processor.Resolve(op.UniqueSuffix)
if err != nil {
logger.Debugf("Failed to resolve suffix[%s] for operation type[%s]: %s", op.UniqueSuffix, op.Type, err.Error())
internalResult, innerErr := r.processor.Resolve(op.UniqueSuffix)
if innerErr != nil {
logger.Debugf("Failed to resolve suffix[%s] for operation type[%s]: %w", op.UniqueSuffix, op.Type, innerErr)

return nil, err
return nil, innerErr
}

if op.Type == operation.TypeUpdate || op.Type == operation.TypeDeactivate {
op.AnchorOrigin = internalResult.AnchorOrigin
}
}

err = r.addOperationToUnpublishedOpsStore(op, pv)
if err != nil {
return nil, fmt.Errorf("failed to add operation for suffix[%s] to unpublished operation store: %s", op.UniqueSuffix, err.Error())
}

// validated operation will be added to the batch
if err := r.addToBatch(op, pv.Protocol().GenesisTime); err != nil {
logger.Errorf("Failed to add operation to batch: %s", err.Error())

r.deleteOperationFromUnpublishedOpsStore(op.UniqueSuffix)

return nil, err
}

Expand All @@ -150,6 +179,40 @@ func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenes
return nil, nil
}

func (r *DocumentHandler) addOperationToUnpublishedOpsStore(op *operation.Operation, pv protocol.Version) error {
if !contains(r.unpublishedOperationTypes, op.Type) {
return nil
}

unpublishedOp := &operation.AnchoredOperation{
Type: op.Type,
UniqueSuffix: op.UniqueSuffix,
OperationBuffer: op.OperationBuffer,
TransactionTime: uint64(time.Now().Unix()),
ProtocolGenesisTime: pv.Protocol().GenesisTime,
AnchorOrigin: op.AnchorOrigin,
}

return r.unpublishedOperationStore.Put(unpublishedOp)
}

func (r *DocumentHandler) deleteOperationFromUnpublishedOpsStore(suffix string) {
err := r.unpublishedOperationStore.Delete(suffix)
if err != nil {
logger.Warnf("Failed to delete operation from unpublished store: %s", err.Error())
}
}

func contains(values []operation.Type, value operation.Type) bool {
for _, v := range values {
if v == value {
return true
}
}

return false
}

func (r *DocumentHandler) getCreateResult(op *operation.Operation, pv protocol.Version) (*protocol.ResolutionModel, error) {
// we can use operation applier to generate create response even though operation is not anchored yet
anchored := &operation.AnchoredOperation{
Expand Down Expand Up @@ -406,3 +469,14 @@ func getSuffix(namespace, idOrDocument string) (string, error) {

return idOrDocument[adjustedPos:], nil
}

type noopUnpublishedOpsStore struct {
}

func (noop *noopUnpublishedOpsStore) Put(_ *operation.AnchoredOperation) error {
return nil
}

func (noop *noopUnpublishedOpsStore) Delete(_ string) error {
return nil
}
88 changes: 84 additions & 4 deletions pkg/dochandler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,73 @@ func TestDocumentHandler_ProcessOperation_Update(t *testing.T) {
require.Nil(t, doc)
})

t.Run("success - unpublished operation store option", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

opt := WithUnpublishedOperationStore(&noopUnpublishedOpsStore{}, []operation.Type{operation.TypeUpdate})

dochandler, cleanup := getDocumentHandler(store, opt)
require.NotNil(t, dochandler)
defer cleanup()

createOp := getCreateOperation()

createOpBuffer, err := json.Marshal(createOp)
require.NoError(t, err)

updateOp, err := generateUpdateOperation(createOp.UniqueSuffix)
require.NoError(t, err)

err = store.Put(&operation.AnchoredOperation{UniqueSuffix: createOp.UniqueSuffix, Type: operation.TypeCreate, OperationBuffer: createOpBuffer})
require.NoError(t, err)

doc, err := dochandler.ProcessOperation(updateOp, 0)
require.NoError(t, err)
require.Nil(t, doc)
})

t.Run("error - unpublished operation store put error", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

opt := WithUnpublishedOperationStore(
&mockUnpublishedOpsStore{PutErr: fmt.Errorf("put error")},
[]operation.Type{operation.TypeUpdate})

dochandler, cleanup := getDocumentHandler(store, opt)
require.NotNil(t, dochandler)
defer cleanup()

createOp := getCreateOperation()

createOpBuffer, err := json.Marshal(createOp)
require.NoError(t, err)

updateOp, err := generateUpdateOperation(createOp.UniqueSuffix)
require.NoError(t, err)

err = store.Put(&operation.AnchoredOperation{UniqueSuffix: createOp.UniqueSuffix, Type: operation.TypeCreate, OperationBuffer: createOpBuffer})
require.NoError(t, err)

doc, err := dochandler.ProcessOperation(updateOp, 0)
require.Error(t, err)
require.Nil(t, doc)
require.Contains(t, err.Error(), "put error")
})

t.Run("error - unpublished operation store delete error", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

opt := WithUnpublishedOperationStore(
&mockUnpublishedOpsStore{DeleteErr: fmt.Errorf("delete error")},
[]operation.Type{operation.TypeUpdate})

dochandler, cleanup := getDocumentHandler(store, opt)
require.NotNil(t, dochandler)
defer cleanup()

dochandler.deleteOperationFromUnpublishedOpsStore("suffix")
})

t.Run("error - processor error", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

Expand Down Expand Up @@ -526,11 +593,11 @@ func (m *BatchContext) OperationQueue() cutter.OperationQueue {

type cleanup func()

func getDocumentHandler(store processor.OperationStoreClient) (*DocumentHandler, cleanup) {
return getDocumentHandlerWithProtocolClient(store, newMockProtocolClient())
func getDocumentHandler(store *mocks.MockOperationStore, opts ...Option) (*DocumentHandler, cleanup) {
return getDocumentHandlerWithProtocolClient(store, newMockProtocolClient(), opts...)
}

func getDocumentHandlerWithProtocolClient(store processor.OperationStoreClient, protocol *mocks.MockProtocolClient) (*DocumentHandler, cleanup) {
func getDocumentHandlerWithProtocolClient(store *mocks.MockOperationStore, protocol *mocks.MockProtocolClient, opts ...Option) (*DocumentHandler, cleanup) { //nolint: interfacer
processor := processor.New("test", store, protocol)

ctx := &BatchContext{
Expand All @@ -547,7 +614,7 @@ func getDocumentHandlerWithProtocolClient(store processor.OperationStoreClient,
// start go routine for cutting batches
writer.Start()

return New(namespace, []string{alias}, protocol, writer, processor), func() { writer.Stop() }
return New(namespace, []string{alias}, protocol, writer, processor, opts...), func() { writer.Stop() }
}

func getCreateOperation() *model.Operation {
Expand Down Expand Up @@ -842,3 +909,16 @@ func newMockProtocolClient() *mocks.MockProtocolClient {

return pc
}

type mockUnpublishedOpsStore struct {
PutErr error
DeleteErr error
}

func (m *mockUnpublishedOpsStore) Put(_ *operation.AnchoredOperation) error {
return m.PutErr
}

func (m *mockUnpublishedOpsStore) Delete(_ string) error {
return m.DeleteErr
}
41 changes: 39 additions & 2 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type OperationProcessor struct {
name string
store OperationStoreClient
pc protocol.Client

unpublishedOperationStore unpublishedOperationStore
}

// OperationStoreClient defines interface for retrieving all operations related to document.
Expand All @@ -34,9 +36,31 @@ type OperationStoreClient interface {
Get(uniqueSuffix string) ([]*operation.AnchoredOperation, error)
}

type unpublishedOperationStore interface {
// Get retrieves unpublished operation related to document, we can have only one unpublished operation.
Get(uniqueSuffix string) (*operation.AnchoredOperation, error)
}

// New returns new operation processor with the given name. (Note that name is only used for logging.)
func New(name string, store OperationStoreClient, pc protocol.Client) *OperationProcessor {
return &OperationProcessor{name: name, store: store, pc: pc}
func New(name string, store OperationStoreClient, pc protocol.Client, opts ...Option) *OperationProcessor {
op := &OperationProcessor{name: name, store: store, pc: pc, unpublishedOperationStore: &noopUnpublishedOpsStore{}}

// apply options
for _, opt := range opts {
opt(op)
}

return op
}

// Option is an option for operation processor.
type Option func(opts *OperationProcessor)

// WithUnpublishedOperationStore stores unpublished operation into unpublished operation store.
func WithUnpublishedOperationStore(store unpublishedOperationStore) Option {
return func(opts *OperationProcessor) {
opts.unpublishedOperationStore = store
}
}

// Resolve document based on the given unique suffix.
Expand All @@ -48,6 +72,13 @@ func (s *OperationProcessor) Resolve(uniqueSuffix string) (*protocol.ResolutionM
return nil, err
}

unpublishedOp, err := s.unpublishedOperationStore.Get(uniqueSuffix)
if err == nil {
logger.Debugf("[%s] Found unpublished %s operation for unique suffix [%s]", s.name, unpublishedOp.Type, uniqueSuffix)

ops = append(ops, unpublishedOp)
}

sortOperations(ops)

logger.Debugf("[%s] Found %d operations for unique suffix [%s]: %+v", s.name, len(ops), uniqueSuffix, ops)
Expand Down Expand Up @@ -322,3 +353,9 @@ func (s *OperationProcessor) getCommitment(op *operation.AnchoredOperation) (str

return nextCommitment, nil
}

type noopUnpublishedOpsStore struct{}

func (noop *noopUnpublishedOpsStore) Get(_ string) (*operation.AnchoredOperation, error) {
return nil, fmt.Errorf("not found")
}
28 changes: 28 additions & 0 deletions pkg/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ func TestUpdateDocument(t *testing.T) {
require.Equal(t, "special2", didDoc["test"])
})

t.Run("success - with unpublished operation store", func(t *testing.T) {
store, uniqueSuffix := getDefaultStore(recoveryKey, updateKey)

updateOp, _, err := getAnchoredUpdateOperation(updateKey, uniqueSuffix, 1)
require.Nil(t, err)

p := New("test", store, pc, WithUnpublishedOperationStore(&mockUnpublishedOpsStore{AnchoredOp: updateOp}))
result, err := p.Resolve(uniqueSuffix)
require.Nil(t, err)

// check if service type value is updated (done via json patch)
didDoc := document.DidDocumentFromJSONLDObject(result.Doc)
require.Equal(t, "special1", didDoc["test"])
})

t.Run("success - protocol version changed between create/update", func(t *testing.T) {
store, uniqueSuffix := getDefaultStore(recoveryKey, updateKey)

Expand Down Expand Up @@ -1353,3 +1368,16 @@ func newMockProtocolClient() *mocks.MockProtocolClient {

return pc
}

type mockUnpublishedOpsStore struct {
GetErr error
AnchoredOp *operation.AnchoredOperation
}

func (m *mockUnpublishedOpsStore) Get(_ string) (*operation.AnchoredOperation, error) {
if m.GetErr != nil {
return nil, m.GetErr
}

return m.AnchoredOp, nil
}
Loading

0 comments on commit 4baed6e

Please sign in to comment.