diff --git a/pkg/dochandler/handler.go b/pkg/dochandler/handler.go index da7c43e4..f9bc8a8c 100644 --- a/pkg/dochandler/handler.go +++ b/pkg/dochandler/handler.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/trustbloc/edge-core/pkg/log" @@ -49,6 +50,16 @@ type DocumentHandler struct { aliases []string // namespace aliases domain string label string + + unpublishedOperationStore unpublishedOperationStore + unpublishedOperationTypes []operation.Type +} + +type unpublishedOperationStore interface { + // Put saves operation into unpublished operation store. + Put(op *operation.AnchoredOperation) error + // Delete deletes operation from unpublished operation store. + Delete(suffix string) error } // operationProcessor is an interface which resolves the document based on the ID. @@ -78,14 +89,24 @@ func WithLabel(label string) Option { } } +// WithUnpublishedOperationStore stores unpublished operation into unpublished operation store. +func WithUnpublishedOperationStore(store unpublishedOperationStore, operationTypes []operation.Type) Option { + return func(opts *DocumentHandler) { + opts.unpublishedOperationStore = store + opts.unpublishedOperationTypes = operationTypes + } +} + // New creates a new document handler with the context. func New(namespace string, aliases []string, pc protocol.Client, writer batchWriter, processor operationProcessor, opts ...Option) *DocumentHandler { dh := &DocumentHandler{ - protocol: pc, - processor: processor, - writer: writer, - namespace: namespace, - aliases: aliases, + protocol: pc, + processor: processor, + writer: writer, + namespace: namespace, + aliases: aliases, + unpublishedOperationStore: &noopUnpublishedOpsStore{}, + unpublishedOperationTypes: []operation.Type{}, } // apply options @@ -102,7 +123,7 @@ func (r *DocumentHandler) Namespace() string { } // ProcessOperation validates operation and adds it to the batch. -func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenesisTime uint64) (*document.ResolutionResult, error) { +func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenesisTime uint64) (*document.ResolutionResult, error) { //nolint:gocyclo pv, err := r.protocol.Get(protocolGenesisTime) if err != nil { return nil, err @@ -114,18 +135,19 @@ func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenes } // perform validation for operation request - if err := r.validateOperation(op, pv); err != nil { + err = r.validateOperation(op, pv) + if err != nil { logger.Warnf("Failed to validate operation: %s", err.Error()) return nil, err } if op.Type != operation.TypeCreate { - internalResult, err := r.processor.Resolve(op.UniqueSuffix) - if err != nil { - logger.Debugf("Failed to resolve suffix[%s] for operation type[%s]: %s", op.UniqueSuffix, op.Type, err.Error()) + internalResult, innerErr := r.processor.Resolve(op.UniqueSuffix) + if innerErr != nil { + logger.Debugf("Failed to resolve suffix[%s] for operation type[%s]: %w", op.UniqueSuffix, op.Type, innerErr) - return nil, err + return nil, innerErr } if op.Type == operation.TypeUpdate || op.Type == operation.TypeDeactivate { @@ -133,10 +155,17 @@ func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenes } } + err = r.addOperationToUnpublishedOpsStore(op, pv) + if err != nil { + return nil, fmt.Errorf("failed to add operation for suffix[%s] to unpublished operation store: %s", op.UniqueSuffix, err.Error()) + } + // validated operation will be added to the batch if err := r.addToBatch(op, pv.Protocol().GenesisTime); err != nil { logger.Errorf("Failed to add operation to batch: %s", err.Error()) + r.deleteOperationFromUnpublishedOpsStore(op.UniqueSuffix) + return nil, err } @@ -150,6 +179,40 @@ func (r *DocumentHandler) ProcessOperation(operationBuffer []byte, protocolGenes return nil, nil } +func (r *DocumentHandler) addOperationToUnpublishedOpsStore(op *operation.Operation, pv protocol.Version) error { + if !contains(r.unpublishedOperationTypes, op.Type) { + return nil + } + + unpublishedOp := &operation.AnchoredOperation{ + Type: op.Type, + UniqueSuffix: op.UniqueSuffix, + OperationBuffer: op.OperationBuffer, + TransactionTime: uint64(time.Now().Unix()), + ProtocolGenesisTime: pv.Protocol().GenesisTime, + AnchorOrigin: op.AnchorOrigin, + } + + return r.unpublishedOperationStore.Put(unpublishedOp) +} + +func (r *DocumentHandler) deleteOperationFromUnpublishedOpsStore(suffix string) { + err := r.unpublishedOperationStore.Delete(suffix) + if err != nil { + logger.Warnf("Failed to delete operation from unpublished store: %s", err.Error()) + } +} + +func contains(values []operation.Type, value operation.Type) bool { + for _, v := range values { + if v == value { + return true + } + } + + return false +} + func (r *DocumentHandler) getCreateResult(op *operation.Operation, pv protocol.Version) (*protocol.ResolutionModel, error) { // we can use operation applier to generate create response even though operation is not anchored yet anchored := &operation.AnchoredOperation{ @@ -406,3 +469,14 @@ func getSuffix(namespace, idOrDocument string) (string, error) { return idOrDocument[adjustedPos:], nil } + +type noopUnpublishedOpsStore struct { +} + +func (noop *noopUnpublishedOpsStore) Put(_ *operation.AnchoredOperation) error { + return nil +} + +func (noop *noopUnpublishedOpsStore) Delete(_ string) error { + return nil +} diff --git a/pkg/dochandler/handler_test.go b/pkg/dochandler/handler_test.go index 90fcf6d4..1622a660 100644 --- a/pkg/dochandler/handler_test.go +++ b/pkg/dochandler/handler_test.go @@ -113,6 +113,73 @@ func TestDocumentHandler_ProcessOperation_Update(t *testing.T) { require.Nil(t, doc) }) + t.Run("success - unpublished operation store option", func(t *testing.T) { + store := mocks.NewMockOperationStore(nil) + + opt := WithUnpublishedOperationStore(&noopUnpublishedOpsStore{}, []operation.Type{operation.TypeUpdate}) + + dochandler, cleanup := getDocumentHandler(store, opt) + require.NotNil(t, dochandler) + defer cleanup() + + createOp := getCreateOperation() + + createOpBuffer, err := json.Marshal(createOp) + require.NoError(t, err) + + updateOp, err := generateUpdateOperation(createOp.UniqueSuffix) + require.NoError(t, err) + + err = store.Put(&operation.AnchoredOperation{UniqueSuffix: createOp.UniqueSuffix, Type: operation.TypeCreate, OperationBuffer: createOpBuffer}) + require.NoError(t, err) + + doc, err := dochandler.ProcessOperation(updateOp, 0) + require.NoError(t, err) + require.Nil(t, doc) + }) + + t.Run("error - unpublished operation store put error", func(t *testing.T) { + store := mocks.NewMockOperationStore(nil) + + opt := WithUnpublishedOperationStore( + &mockUnpublishedOpsStore{PutErr: fmt.Errorf("put error")}, + []operation.Type{operation.TypeUpdate}) + + dochandler, cleanup := getDocumentHandler(store, opt) + require.NotNil(t, dochandler) + defer cleanup() + + createOp := getCreateOperation() + + createOpBuffer, err := json.Marshal(createOp) + require.NoError(t, err) + + updateOp, err := generateUpdateOperation(createOp.UniqueSuffix) + require.NoError(t, err) + + err = store.Put(&operation.AnchoredOperation{UniqueSuffix: createOp.UniqueSuffix, Type: operation.TypeCreate, OperationBuffer: createOpBuffer}) + require.NoError(t, err) + + doc, err := dochandler.ProcessOperation(updateOp, 0) + require.Error(t, err) + require.Nil(t, doc) + require.Contains(t, err.Error(), "put error") + }) + + t.Run("error - unpublished operation store delete error", func(t *testing.T) { + store := mocks.NewMockOperationStore(nil) + + opt := WithUnpublishedOperationStore( + &mockUnpublishedOpsStore{DeleteErr: fmt.Errorf("delete error")}, + []operation.Type{operation.TypeUpdate}) + + dochandler, cleanup := getDocumentHandler(store, opt) + require.NotNil(t, dochandler) + defer cleanup() + + dochandler.deleteOperationFromUnpublishedOpsStore("suffix") + }) + t.Run("error - processor error", func(t *testing.T) { store := mocks.NewMockOperationStore(nil) @@ -526,11 +593,11 @@ func (m *BatchContext) OperationQueue() cutter.OperationQueue { type cleanup func() -func getDocumentHandler(store processor.OperationStoreClient) (*DocumentHandler, cleanup) { - return getDocumentHandlerWithProtocolClient(store, newMockProtocolClient()) +func getDocumentHandler(store *mocks.MockOperationStore, opts ...Option) (*DocumentHandler, cleanup) { + return getDocumentHandlerWithProtocolClient(store, newMockProtocolClient(), opts...) } -func getDocumentHandlerWithProtocolClient(store processor.OperationStoreClient, protocol *mocks.MockProtocolClient) (*DocumentHandler, cleanup) { +func getDocumentHandlerWithProtocolClient(store *mocks.MockOperationStore, protocol *mocks.MockProtocolClient, opts ...Option) (*DocumentHandler, cleanup) { //nolint: interfacer processor := processor.New("test", store, protocol) ctx := &BatchContext{ @@ -547,7 +614,7 @@ func getDocumentHandlerWithProtocolClient(store processor.OperationStoreClient, // start go routine for cutting batches writer.Start() - return New(namespace, []string{alias}, protocol, writer, processor), func() { writer.Stop() } + return New(namespace, []string{alias}, protocol, writer, processor, opts...), func() { writer.Stop() } } func getCreateOperation() *model.Operation { @@ -842,3 +909,16 @@ func newMockProtocolClient() *mocks.MockProtocolClient { return pc } + +type mockUnpublishedOpsStore struct { + PutErr error + DeleteErr error +} + +func (m *mockUnpublishedOpsStore) Put(_ *operation.AnchoredOperation) error { + return m.PutErr +} + +func (m *mockUnpublishedOpsStore) Delete(_ string) error { + return m.DeleteErr +} diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index 1a9f8ae5..315d0fd0 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -26,6 +26,8 @@ type OperationProcessor struct { name string store OperationStoreClient pc protocol.Client + + unpublishedOperationStore unpublishedOperationStore } // OperationStoreClient defines interface for retrieving all operations related to document. @@ -34,9 +36,31 @@ type OperationStoreClient interface { Get(uniqueSuffix string) ([]*operation.AnchoredOperation, error) } +type unpublishedOperationStore interface { + // Get retrieves unpublished operation related to document, we can have only one unpublished operation. + Get(uniqueSuffix string) (*operation.AnchoredOperation, error) +} + // New returns new operation processor with the given name. (Note that name is only used for logging.) -func New(name string, store OperationStoreClient, pc protocol.Client) *OperationProcessor { - return &OperationProcessor{name: name, store: store, pc: pc} +func New(name string, store OperationStoreClient, pc protocol.Client, opts ...Option) *OperationProcessor { + op := &OperationProcessor{name: name, store: store, pc: pc, unpublishedOperationStore: &noopUnpublishedOpsStore{}} + + // apply options + for _, opt := range opts { + opt(op) + } + + return op +} + +// Option is an option for operation processor. +type Option func(opts *OperationProcessor) + +// WithUnpublishedOperationStore stores unpublished operation into unpublished operation store. +func WithUnpublishedOperationStore(store unpublishedOperationStore) Option { + return func(opts *OperationProcessor) { + opts.unpublishedOperationStore = store + } } // Resolve document based on the given unique suffix. @@ -48,6 +72,13 @@ func (s *OperationProcessor) Resolve(uniqueSuffix string) (*protocol.ResolutionM return nil, err } + unpublishedOp, err := s.unpublishedOperationStore.Get(uniqueSuffix) + if err == nil { + logger.Debugf("[%s] Found unpublished %s operation for unique suffix [%s]", s.name, unpublishedOp.Type, uniqueSuffix) + + ops = append(ops, unpublishedOp) + } + sortOperations(ops) logger.Debugf("[%s] Found %d operations for unique suffix [%s]: %+v", s.name, len(ops), uniqueSuffix, ops) @@ -322,3 +353,9 @@ func (s *OperationProcessor) getCommitment(op *operation.AnchoredOperation) (str return nextCommitment, nil } + +type noopUnpublishedOpsStore struct{} + +func (noop *noopUnpublishedOpsStore) Get(_ string) (*operation.AnchoredOperation, error) { + return nil, fmt.Errorf("not found") +} diff --git a/pkg/processor/processor_test.go b/pkg/processor/processor_test.go index 37588155..27f60029 100644 --- a/pkg/processor/processor_test.go +++ b/pkg/processor/processor_test.go @@ -160,6 +160,21 @@ func TestUpdateDocument(t *testing.T) { require.Equal(t, "special2", didDoc["test"]) }) + t.Run("success - with unpublished operation store", func(t *testing.T) { + store, uniqueSuffix := getDefaultStore(recoveryKey, updateKey) + + updateOp, _, err := getAnchoredUpdateOperation(updateKey, uniqueSuffix, 1) + require.Nil(t, err) + + p := New("test", store, pc, WithUnpublishedOperationStore(&mockUnpublishedOpsStore{AnchoredOp: updateOp})) + result, err := p.Resolve(uniqueSuffix) + require.Nil(t, err) + + // check if service type value is updated (done via json patch) + didDoc := document.DidDocumentFromJSONLDObject(result.Doc) + require.Equal(t, "special1", didDoc["test"]) + }) + t.Run("success - protocol version changed between create/update", func(t *testing.T) { store, uniqueSuffix := getDefaultStore(recoveryKey, updateKey) @@ -1353,3 +1368,16 @@ func newMockProtocolClient() *mocks.MockProtocolClient { return pc } + +type mockUnpublishedOpsStore struct { + GetErr error + AnchoredOp *operation.AnchoredOperation +} + +func (m *mockUnpublishedOpsStore) Get(_ string) (*operation.AnchoredOperation, error) { + if m.GetErr != nil { + return nil, m.GetErr + } + + return m.AnchoredOp, nil +} diff --git a/pkg/versions/1_0/txnprocessor/txnprocessor.go b/pkg/versions/1_0/txnprocessor/txnprocessor.go index 336280b8..a26fc93e 100644 --- a/pkg/versions/1_0/txnprocessor/txnprocessor.go +++ b/pkg/versions/1_0/txnprocessor/txnprocessor.go @@ -24,6 +24,11 @@ type OperationStore interface { Put(ops []*operation.AnchoredOperation) error } +type unpublishedOperationStore interface { + // DeleteAll deletes unpublished operation for provided suffixes. + DeleteAll(suffixes []string) error +} + // Providers contains the providers required by the TxnProcessor. type Providers struct { OpStore OperationStore @@ -33,12 +38,36 @@ type Providers struct { // TxnProcessor processes Sidetree transactions by persisting them to an operation store. type TxnProcessor struct { *Providers + + unpublishedOperationStore unpublishedOperationStore + unpublishedOperationTypes []operation.Type } // New returns a new document operation processor. -func New(providers *Providers) *TxnProcessor { - return &TxnProcessor{ +func New(providers *Providers, opts ...Option) *TxnProcessor { + tp := &TxnProcessor{ Providers: providers, + + unpublishedOperationStore: &noopUnpublishedOpsStore{}, + unpublishedOperationTypes: []operation.Type{}, + } + + // apply options + for _, opt := range opts { + opt(tp) + } + + return tp +} + +// Option is an option for transaction processor. +type Option func(opts *TxnProcessor) + +// WithUnpublishedOperationStore is unpublished operation store option. +func WithUnpublishedOperationStore(store unpublishedOperationStore, opTypes []operation.Type) Option { + return func(opts *TxnProcessor) { + opts.unpublishedOperationStore = store + opts.unpublishedOperationTypes = opTypes } } @@ -59,6 +88,8 @@ func (p *TxnProcessor) processTxnOperations(txnOps []*operation.AnchoredOperatio batchSuffixes := make(map[string]bool) + var unpublishedOpsSuffixes []string + var ops []*operation.AnchoredOperation for _, op := range txnOps { _, ok := batchSuffixes[op.UniqueSuffix] @@ -74,6 +105,10 @@ func (p *TxnProcessor) processTxnOperations(txnOps []*operation.AnchoredOperatio ops = append(ops, updatedOp) batchSuffixes[op.UniqueSuffix] = true + + if containsOperationType(p.unpublishedOperationTypes, op.Type) { + unpublishedOpsSuffixes = append(unpublishedOpsSuffixes, op.UniqueSuffix) + } } err := p.OpStore.Put(ops) @@ -81,6 +116,11 @@ func (p *TxnProcessor) processTxnOperations(txnOps []*operation.AnchoredOperatio return errors.Wrapf(err, "failed to store operation from anchor string[%s]", sidetreeTxn.AnchorString) } + err = p.unpublishedOperationStore.DeleteAll(unpublishedOpsSuffixes) + if err != nil { + return fmt.Errorf("failed to delete unpublished operations for anchor string[%s]: %w", sidetreeTxn.AnchorString, err) + } + return nil } @@ -94,3 +134,19 @@ func updateAnchoredOperation(op *operation.AnchoredOperation, sidetreeTxn txn.Si return op } + +func containsOperationType(values []operation.Type, value operation.Type) bool { + for _, v := range values { + if v == value { + return true + } + } + + return false +} + +type noopUnpublishedOpsStore struct{} + +func (noop *noopUnpublishedOpsStore) DeleteAll(_ []string) error { + return nil +} diff --git a/pkg/versions/1_0/txnprocessor/txnprocessor_test.go b/pkg/versions/1_0/txnprocessor/txnprocessor_test.go index c8fa4700..ef0585d4 100644 --- a/pkg/versions/1_0/txnprocessor/txnprocessor_test.go +++ b/pkg/versions/1_0/txnprocessor/txnprocessor_test.go @@ -66,6 +66,41 @@ func TestProcessTxnOperations(t *testing.T) { require.NoError(t, err) }) + t.Run("success - with unpublished operation store option", func(t *testing.T) { + providers := &Providers{ + OperationProtocolProvider: &mockTxnOpsProvider{}, + OpStore: &mockOperationStore{}, + } + + opt := WithUnpublishedOperationStore(&mockUnpublishedOpsStore{}, []operation.Type{operation.TypeUpdate}) + + p := New(providers, opt) + batchOps, err := p.OperationProtocolProvider.GetTxnOperations(&txn.SidetreeTxn{AnchorString: anchorString}) + require.NoError(t, err) + + err = p.processTxnOperations(batchOps, txn.SidetreeTxn{AnchorString: anchorString}) + require.NoError(t, err) + }) + + t.Run("error - unpublished operation store error", func(t *testing.T) { + providers := &Providers{ + OperationProtocolProvider: &mockTxnOpsProvider{}, + OpStore: &mockOperationStore{}, + } + + opt := WithUnpublishedOperationStore( + &mockUnpublishedOpsStore{DeleteAllErr: fmt.Errorf("delete all error")}, + []operation.Type{operation.TypeUpdate}) + + p := New(providers, opt) + batchOps, err := p.OperationProtocolProvider.GetTxnOperations(&txn.SidetreeTxn{AnchorString: anchorString}) + require.NoError(t, err) + + err = p.processTxnOperations(batchOps, txn.SidetreeTxn{AnchorString: anchorString}) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to delete unpublished operations for anchor string[1.coreIndexURI]: delete all error") + }) + t.Run("success - multiple operations with same suffix in transaction operations", func(t *testing.T) { providers := &Providers{ OperationProtocolProvider: &mockTxnOpsProvider{}, @@ -126,7 +161,16 @@ func (m *mockTxnOpsProvider) GetTxnOperations(txn *txn.SidetreeTxn) ([]*operatio op := &operation.AnchoredOperation{ UniqueSuffix: "abc", + Type: operation.TypeUpdate, } return []*operation.AnchoredOperation{op}, nil } + +type mockUnpublishedOpsStore struct { + DeleteAllErr error +} + +func (m *mockUnpublishedOpsStore) DeleteAll(_ []string) error { + return m.DeleteAllErr +}