Skip to content

Commit

Permalink
Merge pull request #611 from sandrask/issue-610
Browse files Browse the repository at this point in the history
feat: Add unpublished operation store option
  • Loading branch information
sandrask authored Sep 10, 2021
2 parents 967518c + 4baed6e commit a2e8795
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 19 deletions.
96 changes: 85 additions & 11 deletions pkg/dochandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/trustbloc/edge-core/pkg/log"

Expand Down Expand Up @@ -49,6 +50,16 @@ type DocumentHandler struct {
aliases []string // namespace aliases
domain string
label string

unpublishedOperationStore unpublishedOperationStore
unpublishedOperationTypes []operation.Type
}

type unpublishedOperationStore interface {
// Put saves operation into unpublished operation store.
Put(op *operation.AnchoredOperation) error
// Delete deletes operation from unpublished operation store.
Delete(suffix string) error
}

// operationProcessor is an interface which resolves the document based on the ID.
Expand Down Expand Up @@ -78,14 +89,24 @@ func WithLabel(label string) Option {
}
}

// WithUnpublishedOperationStore stores unpublished operation into unpublished operation store.
func WithUnpublishedOperationStore(store unpublishedOperationStore, operationTypes []operation.Type) Option {
return func(opts *DocumentHandler) {
opts.unpublishedOperationStore = store
opts.unpublishedOperationTypes = operationTypes
}
}

// New creates a new document handler with the context.
func New(namespace string, aliases []string, pc protocol.Client, writer batchWriter, processor operationProcessor, opts ...Option) *DocumentHandler {
dh := &DocumentHandler{
protocol: pc,
processor: processor,
writer: writer,
namespace: namespace,
aliases: aliases,
protocol: pc,
processor: processor,
writer: writer,
namespace: namespace,
aliases: aliases,
unpublishedOperationStore: &noopUnpublishedOpsStore{},
unpublishedOperationTypes: []operation.Type{},
}

// apply options
Expand All @@ -102,7 +123,7 @@ func (r *DocumentHandler) Namespace() string {
}

// ProcessOperation validates operation and adds it to the batch.
func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenesisTime uint64) (*document.ResolutionResult, error) {
func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenesisTime uint64) (*document.ResolutionResult, error) { //nolint:gocyclo
pv, err := r.protocol.Get(protocolGenesisTime)
if err != nil {
return nil, err
Expand All @@ -114,29 +135,37 @@ func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenes
}

// perform validation for operation request
if err := r.validateOperation(op, pv); err != nil {
err = r.validateOperation(op, pv)
if err != nil {
logger.Warnf("Failed to validate operation: %s", err.Error())

return nil, err
}

if op.Type != operation.TypeCreate {
internalResult, err := r.processor.Resolve(op.UniqueSuffix)
if err != nil {
logger.Debugf("Failed to resolve suffix[%s] for operation type[%s]: %s", op.UniqueSuffix, op.Type, err.Error())
internalResult, innerErr := r.processor.Resolve(op.UniqueSuffix)
if innerErr != nil {
logger.Debugf("Failed to resolve suffix[%s] for operation type[%s]: %w", op.UniqueSuffix, op.Type, innerErr)

return nil, err
return nil, innerErr
}

if op.Type == operation.TypeUpdate || op.Type == operation.TypeDeactivate {
op.AnchorOrigin = internalResult.AnchorOrigin
}
}

err = r.addOperationToUnpublishedOpsStore(op, pv)
if err != nil {
return nil, fmt.Errorf("failed to add operation for suffix[%s] to unpublished operation store: %s", op.UniqueSuffix, err.Error())
}

// validated operation will be added to the batch
if err := r.addToBatch(op, pv.Protocol().GenesisTime); err != nil {
logger.Errorf("Failed to add operation to batch: %s", err.Error())

r.deleteOperationFromUnpublishedOpsStore(op.UniqueSuffix)

return nil, err
}

Expand All @@ -150,6 +179,40 @@ func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenes
return nil, nil
}

func (r *DocumentHandler) addOperationToUnpublishedOpsStore(op *operation.Operation, pv protocol.Version) error {
if !contains(r.unpublishedOperationTypes, op.Type) {
return nil
}

unpublishedOp := &operation.AnchoredOperation{
Type: op.Type,
UniqueSuffix: op.UniqueSuffix,
OperationBuffer: op.OperationBuffer,
TransactionTime: uint64(time.Now().Unix()),
ProtocolGenesisTime: pv.Protocol().GenesisTime,
AnchorOrigin: op.AnchorOrigin,
}

return r.unpublishedOperationStore.Put(unpublishedOp)
}

func (r *DocumentHandler) deleteOperationFromUnpublishedOpsStore(suffix string) {
err := r.unpublishedOperationStore.Delete(suffix)
if err != nil {
logger.Warnf("Failed to delete operation from unpublished store: %s", err.Error())
}
}

func contains(values []operation.Type, value operation.Type) bool {
for _, v := range values {
if v == value {
return true
}
}

return false
}

func (r *DocumentHandler) getCreateResult(op *operation.Operation, pv protocol.Version) (*protocol.ResolutionModel, error) {
// we can use operation applier to generate create response even though operation is not anchored yet
anchored := &operation.AnchoredOperation{
Expand Down Expand Up @@ -406,3 +469,14 @@ func getSuffix(namespace, idOrDocument string) (string, error) {

return idOrDocument[adjustedPos:], nil
}

type noopUnpublishedOpsStore struct {
}

func (noop *noopUnpublishedOpsStore) Put(_ *operation.AnchoredOperation) error {
return nil
}

func (noop *noopUnpublishedOpsStore) Delete(_ string) error {
return nil
}
88 changes: 84 additions & 4 deletions pkg/dochandler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,73 @@ func TestDocumentHandler_ProcessOperation_Update(t *testing.T) {
require.Nil(t, doc)
})

t.Run("success - unpublished operation store option", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

opt := WithUnpublishedOperationStore(&noopUnpublishedOpsStore{}, []operation.Type{operation.TypeUpdate})

dochandler, cleanup := getDocumentHandler(store, opt)
require.NotNil(t, dochandler)
defer cleanup()

createOp := getCreateOperation()

createOpBuffer, err := json.Marshal(createOp)
require.NoError(t, err)

updateOp, err := generateUpdateOperation(createOp.UniqueSuffix)
require.NoError(t, err)

err = store.Put(&operation.AnchoredOperation{UniqueSuffix: createOp.UniqueSuffix, Type: operation.TypeCreate, OperationBuffer: createOpBuffer})
require.NoError(t, err)

doc, err := dochandler.ProcessOperation(updateOp, 0)
require.NoError(t, err)
require.Nil(t, doc)
})

t.Run("error - unpublished operation store put error", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

opt := WithUnpublishedOperationStore(
&mockUnpublishedOpsStore{PutErr: fmt.Errorf("put error")},
[]operation.Type{operation.TypeUpdate})

dochandler, cleanup := getDocumentHandler(store, opt)
require.NotNil(t, dochandler)
defer cleanup()

createOp := getCreateOperation()

createOpBuffer, err := json.Marshal(createOp)
require.NoError(t, err)

updateOp, err := generateUpdateOperation(createOp.UniqueSuffix)
require.NoError(t, err)

err = store.Put(&operation.AnchoredOperation{UniqueSuffix: createOp.UniqueSuffix, Type: operation.TypeCreate, OperationBuffer: createOpBuffer})
require.NoError(t, err)

doc, err := dochandler.ProcessOperation(updateOp, 0)
require.Error(t, err)
require.Nil(t, doc)
require.Contains(t, err.Error(), "put error")
})

t.Run("error - unpublished operation store delete error", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

opt := WithUnpublishedOperationStore(
&mockUnpublishedOpsStore{DeleteErr: fmt.Errorf("delete error")},
[]operation.Type{operation.TypeUpdate})

dochandler, cleanup := getDocumentHandler(store, opt)
require.NotNil(t, dochandler)
defer cleanup()

dochandler.deleteOperationFromUnpublishedOpsStore("suffix")
})

t.Run("error - processor error", func(t *testing.T) {
store := mocks.NewMockOperationStore(nil)

Expand Down Expand Up @@ -526,11 +593,11 @@ func (m *BatchContext) OperationQueue() cutter.OperationQueue {

type cleanup func()

func getDocumentHandler(store processor.OperationStoreClient) (*DocumentHandler, cleanup) {
return getDocumentHandlerWithProtocolClient(store, newMockProtocolClient())
func getDocumentHandler(store *mocks.MockOperationStore, opts ...Option) (*DocumentHandler, cleanup) {
return getDocumentHandlerWithProtocolClient(store, newMockProtocolClient(), opts...)
}

func getDocumentHandlerWithProtocolClient(store processor.OperationStoreClient, protocol *mocks.MockProtocolClient) (*DocumentHandler, cleanup) {
func getDocumentHandlerWithProtocolClient(store *mocks.MockOperationStore, protocol *mocks.MockProtocolClient, opts ...Option) (*DocumentHandler, cleanup) { //nolint: interfacer
processor := processor.New("test", store, protocol)

ctx := &BatchContext{
Expand All @@ -547,7 +614,7 @@ func getDocumentHandlerWithProtocolClient(store processor.OperationStoreClient,
// start go routine for cutting batches
writer.Start()

return New(namespace, []string{alias}, protocol, writer, processor), func() { writer.Stop() }
return New(namespace, []string{alias}, protocol, writer, processor, opts...), func() { writer.Stop() }
}

func getCreateOperation() *model.Operation {
Expand Down Expand Up @@ -842,3 +909,16 @@ func newMockProtocolClient() *mocks.MockProtocolClient {

return pc
}

type mockUnpublishedOpsStore struct {
PutErr error
DeleteErr error
}

func (m *mockUnpublishedOpsStore) Put(_ *operation.AnchoredOperation) error {
return m.PutErr
}

func (m *mockUnpublishedOpsStore) Delete(_ string) error {
return m.DeleteErr
}
41 changes: 39 additions & 2 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type OperationProcessor struct {
name string
store OperationStoreClient
pc protocol.Client

unpublishedOperationStore unpublishedOperationStore
}

// OperationStoreClient defines interface for retrieving all operations related to document.
Expand All @@ -34,9 +36,31 @@ type OperationStoreClient interface {
Get(uniqueSuffix string) ([]*operation.AnchoredOperation, error)
}

type unpublishedOperationStore interface {
// Get retrieves unpublished operation related to document, we can have only one unpublished operation.
Get(uniqueSuffix string) (*operation.AnchoredOperation, error)
}

// New returns new operation processor with the given name. (Note that name is only used for logging.)
func New(name string, store OperationStoreClient, pc protocol.Client) *OperationProcessor {
return &OperationProcessor{name: name, store: store, pc: pc}
func New(name string, store OperationStoreClient, pc protocol.Client, opts ...Option) *OperationProcessor {
op := &OperationProcessor{name: name, store: store, pc: pc, unpublishedOperationStore: &noopUnpublishedOpsStore{}}

// apply options
for _, opt := range opts {
opt(op)
}

return op
}

// Option is an option for operation processor.
type Option func(opts *OperationProcessor)

// WithUnpublishedOperationStore stores unpublished operation into unpublished operation store.
func WithUnpublishedOperationStore(store unpublishedOperationStore) Option {
return func(opts *OperationProcessor) {
opts.unpublishedOperationStore = store
}
}

// Resolve document based on the given unique suffix.
Expand All @@ -48,6 +72,13 @@ func (s *OperationProcessor) Resolve(uniqueSuffix string) (*protocol.ResolutionM
return nil, err
}

unpublishedOp, err := s.unpublishedOperationStore.Get(uniqueSuffix)
if err == nil {
logger.Debugf("[%s] Found unpublished %s operation for unique suffix [%s]", s.name, unpublishedOp.Type, uniqueSuffix)

ops = append(ops, unpublishedOp)
}

sortOperations(ops)

logger.Debugf("[%s] Found %d operations for unique suffix [%s]: %+v", s.name, len(ops), uniqueSuffix, ops)
Expand Down Expand Up @@ -322,3 +353,9 @@ func (s *OperationProcessor) getCommitment(op *operation.AnchoredOperation) (str

return nextCommitment, nil
}

type noopUnpublishedOpsStore struct{}

func (noop *noopUnpublishedOpsStore) Get(_ string) (*operation.AnchoredOperation, error) {
return nil, fmt.Errorf("not found")
}
28 changes: 28 additions & 0 deletions pkg/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ func TestUpdateDocument(t *testing.T) {
require.Equal(t, "special2", didDoc["test"])
})

t.Run("success - with unpublished operation store", func(t *testing.T) {
store, uniqueSuffix := getDefaultStore(recoveryKey, updateKey)

updateOp, _, err := getAnchoredUpdateOperation(updateKey, uniqueSuffix, 1)
require.Nil(t, err)

p := New("test", store, pc, WithUnpublishedOperationStore(&mockUnpublishedOpsStore{AnchoredOp: updateOp}))
result, err := p.Resolve(uniqueSuffix)
require.Nil(t, err)

// check if service type value is updated (done via json patch)
didDoc := document.DidDocumentFromJSONLDObject(result.Doc)
require.Equal(t, "special1", didDoc["test"])
})

t.Run("success - protocol version changed between create/update", func(t *testing.T) {
store, uniqueSuffix := getDefaultStore(recoveryKey, updateKey)

Expand Down Expand Up @@ -1353,3 +1368,16 @@ func newMockProtocolClient() *mocks.MockProtocolClient {

return pc
}

type mockUnpublishedOpsStore struct {
GetErr error
AnchoredOp *operation.AnchoredOperation
}

func (m *mockUnpublishedOpsStore) Get(_ string) (*operation.AnchoredOperation, error) {
if m.GetErr != nil {
return nil, m.GetErr
}

return m.AnchoredOp, nil
}
Loading

0 comments on commit a2e8795

Please sign in to comment.