-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Nikita Neznaemov
committed
Sep 28, 2023
1 parent
a31b805
commit 3a3012a
Showing
12 changed files
with
917 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package observer_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
|
||
sdkmocks "github.com/s7techlab/hlf-sdk-go/client/testing" | ||
"github.com/s7techlab/hlf-sdk-go/observer" | ||
testdata "github.com/s7techlab/hlf-sdk-go/testdata/blocks" | ||
) | ||
|
||
var ( | ||
ctx = context.Background() | ||
|
||
channelPeerMock *observer.ChannelPeerMock | ||
blockPeer *observer.BlockPeer | ||
|
||
channelPeerMockConcurrently *observer.ChannelPeerMock | ||
blockPeerConcurrently *observer.BlockPeer | ||
blocksByChannels *observer.BlocksByChannels | ||
) | ||
|
||
var _ = BeforeSuite(func() { | ||
const closeChannelWhenAllRead = true | ||
blockDelivererMock, err := sdkmocks.NewBlocksDelivererMock(fmt.Sprintf("../%s", testdata.Path), closeChannelWhenAllRead) | ||
Expect(err).ShouldNot(HaveOccurred()) | ||
|
||
channelPeerMock = observer.NewChannelPeerMock() | ||
for _, channel := range testdata.Channels { | ||
channelPeerMock.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) | ||
} | ||
|
||
blockPeer = observer.NewBlockPeer(channelPeerMock, blockDelivererMock, | ||
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) | ||
|
||
_, err = blockPeer.Observe(ctx) | ||
Expect(err).ShouldNot(HaveOccurred()) | ||
|
||
channelPeerMockConcurrently = observer.NewChannelPeerMock() | ||
for _, channel := range testdata.Channels { | ||
channelPeerMockConcurrently.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) | ||
} | ||
|
||
blockPeerConcurrently = observer.NewBlockPeer(channelPeerMockConcurrently, blockDelivererMock, | ||
observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) | ||
|
||
blocksByChannels, err = blockPeerConcurrently.ObserveByChannels(ctx) | ||
Expect(err).ShouldNot(HaveOccurred()) | ||
}) | ||
|
||
var _ = Describe("Block Peer", func() { | ||
Context("Channels number check", func() { | ||
Context("Block peer", func() { | ||
It("should return current number of channels", func() { | ||
channelObservers := blockPeer.ChannelObservers() | ||
Expect(channelObservers).To(HaveLen(len(testdata.Channels))) | ||
}) | ||
|
||
It("should add channels to channelPeerMock", func() { | ||
newChannels := []string{"channel1", "channel2", "channel3"} | ||
for _, channel := range newChannels { | ||
channelPeerMock.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) | ||
} | ||
|
||
// wait to blockPeer observer | ||
time.Sleep(time.Millisecond * 10) | ||
|
||
channelObservers := blockPeer.ChannelObservers() | ||
Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) | ||
}) | ||
}) | ||
|
||
Context("Block peer concurrently", func() { | ||
It("should return current number of channels", func() { | ||
channelObservers := blockPeerConcurrently.ChannelObservers() | ||
Expect(channelObservers).To(HaveLen(len(testdata.Channels))) | ||
|
||
channelsWithBlocks := blocksByChannels.Observe() | ||
|
||
for i := 0; i < len(testdata.Channels); i++ { | ||
sampleOrFabcarChannelBlocks := <-channelsWithBlocks | ||
if sampleOrFabcarChannelBlocks.Name == testdata.SampleChannel { | ||
Expect(sampleOrFabcarChannelBlocks.Name).To(Equal(testdata.SampleChannel)) | ||
} else { | ||
Expect(sampleOrFabcarChannelBlocks.Name).To(Equal(testdata.FabcarChannel)) | ||
} | ||
|
||
Expect(sampleOrFabcarChannelBlocks.Blocks).NotTo(BeNil()) | ||
} | ||
}) | ||
|
||
It("should add channels to channelPeerMock", func() { | ||
channel4, channel5, channel6 := "channel4", "channel5", "channel6" | ||
newChannels := []string{channel4, channel5, channel6} | ||
for _, channel := range newChannels { | ||
channelPeerMockConcurrently.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) | ||
} | ||
|
||
// wait to blockPeer observer | ||
time.Sleep(time.Millisecond * 200) | ||
|
||
channelObservers := blockPeerConcurrently.ChannelObservers() | ||
Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) | ||
|
||
channelsWithBlocks := blocksByChannels.Observe() | ||
|
||
for i := 0; i < len(newChannels); i++ { | ||
channel4Or5Or6Blocks := <-channelsWithBlocks | ||
|
||
if channel4Or5Or6Blocks.Name == channel4 { | ||
Expect(channel4Or5Or6Blocks.Name).To(Equal(channel4)) | ||
Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) | ||
} else if channel4Or5Or6Blocks.Name == channel5 { | ||
Expect(channel4Or5Or6Blocks.Name).To(Equal(channel5)) | ||
Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) | ||
} else { | ||
Expect(channel4Or5Or6Blocks.Name).To(Equal(channel6)) | ||
Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) | ||
} | ||
} | ||
}) | ||
}) | ||
}) | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package observer_test | ||
|
||
import ( | ||
"time" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
|
||
"github.com/s7techlab/hlf-sdk-go/observer" | ||
testdata "github.com/s7techlab/hlf-sdk-go/testdata/blocks" | ||
) | ||
|
||
var _ = Describe("Channel peer", func() { | ||
var ( | ||
channelPeerFetcherMock observer.PeerChannelsFetcher | ||
) | ||
BeforeEach(func() { | ||
channelPeerFetcherMock = observer.NewChannelPeerFetcherMock(testdata.ChannelsHeights) | ||
}) | ||
|
||
It("default channel peer, no channel matcher", func() { | ||
channelPeer, err := observer.NewChannelPeer(channelPeerFetcherMock) | ||
Expect(err).To(BeNil()) | ||
|
||
channelPeer.Observe(ctx) | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
channelsMap := channelPeer.Channels() | ||
|
||
sampleChannelInfo, exist := channelsMap[testdata.SampleChannel] | ||
Expect(exist).To(BeTrue()) | ||
Expect(sampleChannelInfo.Channel).To(Equal(testdata.SampleChannel)) | ||
Expect(sampleChannelInfo.Height).To(Equal(testdata.SampleChannelHeight)) | ||
|
||
fabcarChannelInfo, exist := channelsMap[testdata.FabcarChannel] | ||
Expect(exist).To(BeTrue()) | ||
Expect(fabcarChannelInfo.Channel).To(Equal(testdata.FabcarChannel)) | ||
Expect(fabcarChannelInfo.Height).To(Equal(testdata.FabcarChannelHeight)) | ||
}) | ||
|
||
It("default channel peer, with channel matcher, exclude Fabcar", func() { | ||
channelPeer, err := observer.NewChannelPeer(channelPeerFetcherMock, | ||
observer.WithChannels([]observer.ChannelToMatch{{MatchPattern: testdata.SampleChannel}})) | ||
Expect(err).To(BeNil()) | ||
|
||
channelPeer.Observe(ctx) | ||
time.Sleep(time.Millisecond * 100) | ||
|
||
channelsMap := channelPeer.Channels() | ||
|
||
sampleChannelInfo, exist := channelsMap[testdata.SampleChannel] | ||
Expect(exist).To(BeTrue()) | ||
Expect(sampleChannelInfo.Channel).To(Equal(testdata.SampleChannel)) | ||
Expect(sampleChannelInfo.Height).To(Equal(testdata.SampleChannelHeight)) | ||
|
||
fabcarChannelInfo, exist := channelsMap[testdata.FabcarChannel] | ||
Expect(exist).To(BeFalse()) | ||
Expect(fabcarChannelInfo).To(BeNil()) | ||
}) | ||
}) |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
package transform | ||
|
||
import ( | ||
"fmt" | ||
"regexp" | ||
|
||
"github.com/mohae/deepcopy" | ||
|
||
"github.com/s7techlab/hlf-sdk-go/observer" | ||
hlfproto "github.com/s7techlab/hlf-sdk-go/proto" | ||
) | ||
|
||
type ( | ||
Action struct { | ||
match TxActionMatch | ||
inputArgsTransformers []InputArgsTransformer | ||
kvWriteTransformers []KVWriteTransformer | ||
kvReadTransformers []KVReadTransformer | ||
eventTransformers []EventTransformer | ||
actionPayloadTransformers []ActionPayloadTransformer | ||
} | ||
|
||
ActionOpt func(*Action) | ||
|
||
TxActionMatch func(*hlfproto.TransactionAction) bool | ||
TxActionMutate func(*hlfproto.TransactionAction) | ||
) | ||
|
||
func WithInputArgsTransformer(inputArgsTransformers ...InputArgsTransformer) ActionOpt { | ||
return func(a *Action) { | ||
a.inputArgsTransformers = inputArgsTransformers | ||
} | ||
} | ||
|
||
func WithKVWriteTransformer(kvWriteTransformers ...KVWriteTransformer) ActionOpt { | ||
return func(a *Action) { | ||
a.kvWriteTransformers = kvWriteTransformers | ||
} | ||
} | ||
|
||
func WithKVReadTransformer(kvReadTransformers ...KVReadTransformer) ActionOpt { | ||
return func(a *Action) { | ||
a.kvReadTransformers = kvReadTransformers | ||
} | ||
} | ||
|
||
func WithEventTransformer(eventTransformers ...EventTransformer) ActionOpt { | ||
return func(a *Action) { | ||
a.eventTransformers = eventTransformers | ||
} | ||
} | ||
|
||
func WithActionPayloadTransformer(actionTransformers ...ActionPayloadTransformer) ActionOpt { | ||
return func(a *Action) { | ||
a.actionPayloadTransformers = actionTransformers | ||
} | ||
} | ||
|
||
func NewAction(actionMach TxActionMatch, opts ...ActionOpt) *Action { | ||
a := &Action{ | ||
match: actionMach, | ||
} | ||
|
||
for _, opt := range opts { | ||
opt(a) | ||
} | ||
|
||
return a | ||
} | ||
|
||
func (s *Action) Transform(block *observer.Block) error { | ||
if block.Block == nil { | ||
return nil | ||
} | ||
|
||
// if block is transformed, copy of block will be saved to block.BlockOriginal | ||
blockCopy := deepcopy.Copy(block.Block).(*hlfproto.Block) | ||
blockIsTransformed := false | ||
|
||
for _, envelope := range block.Block.Envelopes { | ||
if envelope.Transaction == nil { | ||
continue | ||
} | ||
|
||
for _, txAction := range envelope.Transaction.Actions { | ||
if !s.match(txAction) { | ||
continue | ||
} | ||
|
||
for _, argsTransformer := range s.inputArgsTransformers { | ||
if err := argsTransformer.Transform(txAction.ChaincodeInvocationSpec.ChaincodeSpec.Input.Args); err != nil { | ||
return fmt.Errorf(`args transformer: %w`, err) | ||
} | ||
} | ||
|
||
for _, eventTransformer := range s.eventTransformers { | ||
if err := eventTransformer.Transform(txAction.Event); err != nil { | ||
return fmt.Errorf(`event transformer: %w`, err) | ||
} | ||
} | ||
|
||
for _, rwSet := range txAction.ReadWriteSets { | ||
for _, write := range rwSet.Writes { | ||
for _, kvWriteTransformer := range s.kvWriteTransformers { | ||
origKey := write.Key | ||
if err := kvWriteTransformer.Transform(write); err != nil { | ||
return fmt.Errorf(`KV write transformer with key: %s: %w`, write.Key, err) | ||
} | ||
|
||
if origKey != write.Key { | ||
blockIsTransformed = true | ||
} | ||
} | ||
} | ||
|
||
for _, read := range rwSet.Reads { | ||
for _, kvReadTransform := range s.kvReadTransformers { | ||
origKey := read.Key | ||
if err := kvReadTransform.Transform(read); err != nil { | ||
return fmt.Errorf(`KV read transformer with key: %s: %w`, read.Key, err) | ||
} | ||
if origKey != read.Key { | ||
blockIsTransformed = true | ||
} | ||
} | ||
} | ||
|
||
for _, actionPayloadTransform := range s.actionPayloadTransformers { | ||
if err := actionPayloadTransform.Transform(txAction); err != nil { | ||
return fmt.Errorf(`action payload transform: %w`, err) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
if blockIsTransformed { | ||
block.BlockOriginal = blockCopy | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func TxChaincodeIDMatch(chaincode string) TxActionMatch { | ||
return func(action *hlfproto.TransactionAction) bool { | ||
return action.ChaincodeInvocationSpec.ChaincodeSpec.ChaincodeId.Name == chaincode | ||
} | ||
} | ||
func TxChaincodesIDMatch(chaincodes ...string) TxActionMatch { | ||
return func(action *hlfproto.TransactionAction) bool { | ||
for k := range chaincodes { | ||
if action.ChaincodeInvocationSpec.ChaincodeSpec.ChaincodeId.Name == chaincodes[k] { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
} | ||
|
||
func TxChaincodesIDRegexp(chaincodePattern string) TxActionMatch { | ||
return func(action *hlfproto.TransactionAction) bool { | ||
matched, _ := regexp.MatchString(chaincodePattern, action.ChaincodeInvocationSpec.ChaincodeSpec.ChaincodeId.Name) | ||
|
||
return matched | ||
} | ||
} | ||
func TxChaincodesIDRegexpExclude(chaincodePattern string) TxActionMatch { | ||
return func(action *hlfproto.TransactionAction) bool { | ||
matched, _ := regexp.MatchString(chaincodePattern, action.ChaincodeInvocationSpec.ChaincodeSpec.ChaincodeId.Name) | ||
|
||
return !matched | ||
} | ||
} | ||
|
||
func TxChaincodePatternsIDRegexpExclude(chaincodePatterns ...string) TxActionMatch { | ||
return func(action *hlfproto.TransactionAction) bool { | ||
isInSlice := false | ||
for key := range chaincodePatterns { | ||
matched, _ := regexp.MatchString(chaincodePatterns[key], action.ChaincodeInvocationSpec.ChaincodeSpec.ChaincodeId.Name) | ||
if matched { | ||
isInSlice = true | ||
} | ||
} | ||
return !isInSlice | ||
} | ||
} | ||
|
||
func TxChaincodeAnyMatch() TxActionMatch { | ||
return func(action *hlfproto.TransactionAction) bool { | ||
return true | ||
} | ||
} |
Oops, something went wrong.