diff --git a/client/testing/channels_fetcher_mock.go b/client/testing/channels_fetcher_mock.go deleted file mode 100644 index 91248d3c..00000000 --- a/client/testing/channels_fetcher_mock.go +++ /dev/null @@ -1,28 +0,0 @@ -package testing - -import ( - "context" - - "github.com/hyperledger/fabric-protos-go/peer" -) - -type ChannelsFetcherMock struct { - channels []string -} - -func NewChannelsFetcherMock(channels []string) *ChannelsFetcherMock { - return &ChannelsFetcherMock{ - channels: channels, - } -} - -func (c ChannelsFetcherMock) GetChannels(ctx context.Context) (*peer.ChannelQueryResponse, error) { - var channels []*peer.ChannelInfo - for i := range c.channels { - channels = append(channels, &peer.ChannelInfo{ChannelId: c.channels[i]}) - } - - return &peer.ChannelQueryResponse{ - Channels: channels, - }, nil -} diff --git a/observer/block_peer_test.go b/observer/block_peer_test.go index ccb76a48..89386723 100644 --- a/observer/block_peer_test.go +++ b/observer/block_peer_test.go @@ -1,115 +1,128 @@ package observer_test -//var ( -// ctx = context.Background() -// -// channelPeerMock *observer.ChannelPeerMock -// blockPeer *observer.BlockPeer -// -// channelPeerMockConcurrently *observer.ChannelPeerMock -// blockPeerConcurrently *observer.BlockPeer -// blocksByChannels *observer.BlocksByChannels -//) - -//var _ = BeforeSuite(func() { -// const closeChannelWhenAllRead = true -// blockDelivererMock, err := sdkmocks.NewBlocksDelivererMock(fmt.Sprintf("../%s", testdata.Path), closeChannelWhenAllRead) -// Expect(err).ShouldNot(HaveOccurred()) -// -// channelPeerMock = observer.NewChannelPeerMock() -// for channel := range testdata.TestChannels { -// channelPeerMock.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) -// } -// -// blockPeer = observer.NewBlockPeer(channelPeerMock, blockDelivererMock, -// observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) -// -// _, err = blockPeer.Observe(ctx) -// Expect(err).ShouldNot(HaveOccurred()) -// -// channelPeerMockConcurrently = observer.NewChannelPeerMock() -// for channel := range testdata.TestChannels { -// channelPeerMockConcurrently.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) -// } -// -// blockPeerConcurrently = observer.NewBlockPeer(channelPeerMockConcurrently, blockDelivererMock, -// observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) -// -// blocksByChannels, err = blockPeerConcurrently.ObserveByChannels(ctx) -// Expect(err).ShouldNot(HaveOccurred()) -//}) -// -//var _ = Describe("Block Peer", func() { -// Context("Channels number check", func() { -// Context("Block peer", func() { -// It("should return current number of channels", func() { -// channelObservers := blockPeer.ChannelObservers() -// Expect(channelObservers).To(HaveLen(len(testdata.TestChannels))) -// }) -// -// It("should add channels to channelPeerMock", func() { -// newChannels := []string{"channel1", "channel2", "channel3"} -// for _, channel := range newChannels { -// channelPeerMock.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) -// } -// -// // wait to blockPeer observer -// time.Sleep(time.Millisecond * 10) -// -// channelObservers := blockPeer.ChannelObservers() -// Expect(channelObservers).To(HaveLen(len(testdata.TestChannels) + len(newChannels))) -// }) -// }) -// -// Context("Block peer concurrently", func() { -// It("should return current number of channels", func() { -// channelObservers := blockPeerConcurrently.ChannelObservers() -// Expect(channelObservers).To(HaveLen(len(testdata.TestChannels))) -// -// channelsWithBlocks := blocksByChannels.Observe() -// -// for i := 0; i < len(testdata.TestChannels); i++ { -// sampleOrFabcarChannelBlocks := <-channelsWithBlocks -// if sampleOrFabcarChannelBlocks.Name == testdata.SampleChannel { -// Expect(sampleOrFabcarChannelBlocks.Name).To(Equal(testdata.SampleChannel)) -// } else { -// Expect(sampleOrFabcarChannelBlocks.Name).To(Equal(testdata.FabcarChannel)) -// } -// -// Expect(sampleOrFabcarChannelBlocks.Blocks).NotTo(BeNil()) -// } -// }) -// -// It("should add channels to channelPeerMock", func() { -// channel4, channel5, channel6 := "channel4", "channel5", "channel6" -// newChannels := []string{channel4, channel5, channel6} -// for _, channel := range newChannels { -// channelPeerMockConcurrently.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) -// } -// -// // wait to blockPeer observer -// time.Sleep(time.Millisecond * 200) -// -// channelObservers := blockPeerConcurrently.ChannelObservers() -// Expect(channelObservers).To(HaveLen(len(testdata.TestChannels) + len(newChannels))) -// -// channelsWithBlocks := blocksByChannels.Observe() -// -// for i := 0; i < len(newChannels); i++ { -// channel4Or5Or6Blocks := <-channelsWithBlocks -// -// if channel4Or5Or6Blocks.Name == channel4 { -// Expect(channel4Or5Or6Blocks.Name).To(Equal(channel4)) -// Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) -// } else if channel4Or5Or6Blocks.Name == channel5 { -// Expect(channel4Or5Or6Blocks.Name).To(Equal(channel5)) -// Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) -// } else { -// Expect(channel4Or5Or6Blocks.Name).To(Equal(channel6)) -// Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) -// } -// } -// }) -// }) -// }) -//}) +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + sdkmocks "github.com/s7techlab/hlf-sdk-go/client/testing" + "github.com/s7techlab/hlf-sdk-go/observer" + testdata "github.com/s7techlab/hlf-sdk-go/testdata/blocks" +) + +var ( + ctx = context.Background() + + channelPeerMock *observer.ChannelPeerMock + blockPeer *observer.BlockPeer + + channelPeerMockConcurrently *observer.ChannelPeerMock + blockPeerConcurrently *observer.BlockPeer + blocksByChannels *observer.BlocksByChannels +) + +var _ = BeforeSuite(func() { + const closeChannelWhenAllRead = true + blockDelivererMock, err := sdkmocks.NewBlocksDelivererMock(fmt.Sprintf("../%s", testdata.Path), closeChannelWhenAllRead) + Expect(err).ShouldNot(HaveOccurred()) + + channelPeerMock = observer.NewChannelPeerMock() + for _, channel := range testdata.Channels { + channelPeerMock.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + } + + blockPeer = observer.NewBlockPeer(channelPeerMock, blockDelivererMock, + observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + + _, err = blockPeer.Observe(ctx) + Expect(err).ShouldNot(HaveOccurred()) + + channelPeerMockConcurrently = observer.NewChannelPeerMock() + for _, channel := range testdata.Channels { + channelPeerMockConcurrently.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + } + + blockPeerConcurrently = observer.NewBlockPeer(channelPeerMockConcurrently, blockDelivererMock, + observer.WithBlockStopRecreateStream(true), observer.WithBlockPeerObservePeriod(time.Nanosecond)) + + blocksByChannels, err = blockPeerConcurrently.ObserveByChannels(ctx) + Expect(err).ShouldNot(HaveOccurred()) +}) + +var _ = Describe("Block Peer", func() { + Context("Channels number check", func() { + Context("Block peer", func() { + It("should return current number of channels", func() { + channelObservers := blockPeer.ChannelObservers() + Expect(channelObservers).To(HaveLen(len(testdata.Channels))) + }) + + It("should add channels to channelPeerMock", func() { + newChannels := []string{"channel1", "channel2", "channel3"} + for _, channel := range newChannels { + channelPeerMock.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + } + + // wait to blockPeer observer + time.Sleep(time.Millisecond * 10) + + channelObservers := blockPeer.ChannelObservers() + Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) + }) + }) + + Context("Block peer concurrently", func() { + It("should return current number of channels", func() { + channelObservers := blockPeerConcurrently.ChannelObservers() + Expect(channelObservers).To(HaveLen(len(testdata.Channels))) + + channelsWithBlocks := blocksByChannels.Observe() + + for i := 0; i < len(testdata.Channels); i++ { + sampleOrFabcarChannelBlocks := <-channelsWithBlocks + if sampleOrFabcarChannelBlocks.Name == testdata.SampleChannel { + Expect(sampleOrFabcarChannelBlocks.Name).To(Equal(testdata.SampleChannel)) + } else { + Expect(sampleOrFabcarChannelBlocks.Name).To(Equal(testdata.FabcarChannel)) + } + + Expect(sampleOrFabcarChannelBlocks.Blocks).NotTo(BeNil()) + } + }) + + It("should add channels to channelPeerMock", func() { + channel4, channel5, channel6 := "channel4", "channel5", "channel6" + newChannels := []string{channel4, channel5, channel6} + for _, channel := range newChannels { + channelPeerMockConcurrently.UpdateChannelInfo(&observer.ChannelInfo{Channel: channel}) + } + + // wait to blockPeer observer + time.Sleep(time.Millisecond * 200) + + channelObservers := blockPeerConcurrently.ChannelObservers() + Expect(channelObservers).To(HaveLen(len(testdata.Channels) + len(newChannels))) + + channelsWithBlocks := blocksByChannels.Observe() + + for i := 0; i < len(newChannels); i++ { + channel4Or5Or6Blocks := <-channelsWithBlocks + + if channel4Or5Or6Blocks.Name == channel4 { + Expect(channel4Or5Or6Blocks.Name).To(Equal(channel4)) + Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) + } else if channel4Or5Or6Blocks.Name == channel5 { + Expect(channel4Or5Or6Blocks.Name).To(Equal(channel5)) + Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) + } else { + Expect(channel4Or5Or6Blocks.Name).To(Equal(channel6)) + Expect(channel4Or5Or6Blocks.Blocks).NotTo(BeNil()) + } + } + }) + }) + }) +}) diff --git a/observer/channel_peer.go b/observer/channel_peer.go index 9b50300c..2fbe4a22 100644 --- a/observer/channel_peer.go +++ b/observer/channel_peer.go @@ -108,7 +108,7 @@ func (cp *ChannelPeer) Observe(ctx context.Context) { return } - // ctxObserve using for nested controll process without stopped primary context + // ctxObserve using for nested control process without stopped primary context ctxObserve, cancel := context.WithCancel(context.Background()) cp.cancelObserve = cancel diff --git a/observer/channel_peer_fetcher_mock.go b/observer/channel_peer_fetcher_mock.go new file mode 100644 index 00000000..a1e51082 --- /dev/null +++ b/observer/channel_peer_fetcher_mock.go @@ -0,0 +1,41 @@ +package observer + +import ( + "context" + "fmt" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/peer" +) + +type ChannelPeerFetcherMock struct { + channels map[string]uint64 +} + +func NewChannelPeerFetcherMock(channels map[string]uint64) *ChannelPeerFetcherMock { + return &ChannelPeerFetcherMock{ + channels: channels, + } +} + +func (c *ChannelPeerFetcherMock) GetChannels(ctx context.Context) (*peer.ChannelQueryResponse, error) { + var channels []*peer.ChannelInfo + for channelName := range c.channels { + channels = append(channels, &peer.ChannelInfo{ChannelId: channelName}) + } + + return &peer.ChannelQueryResponse{ + Channels: channels, + }, nil +} + +func (c *ChannelPeerFetcherMock) GetChainInfo(ctx context.Context, channel string) (*common.BlockchainInfo, error) { + chHeight, exists := c.channels[channel] + if !exists { + return nil, fmt.Errorf("channel '%s' does not exist", channel) + } + + return &common.BlockchainInfo{ + Height: chHeight, + }, nil +} diff --git a/observer/channel_peer_test.go b/observer/channel_peer_test.go new file mode 100644 index 00000000..1e61264c --- /dev/null +++ b/observer/channel_peer_test.go @@ -0,0 +1,60 @@ +package observer_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/s7techlab/hlf-sdk-go/observer" + testdata "github.com/s7techlab/hlf-sdk-go/testdata/blocks" +) + +var _ = Describe("Channel peer", func() { + var ( + channelPeerFetcherMock observer.PeerChannelsFetcher + ) + BeforeEach(func() { + channelPeerFetcherMock = observer.NewChannelPeerFetcherMock(testdata.ChannelsHeights) + }) + + It("default channel peer, no channel matcher", func() { + channelPeer, err := observer.NewChannelPeer(channelPeerFetcherMock) + Expect(err).To(BeNil()) + + channelPeer.Observe(ctx) + time.Sleep(time.Millisecond * 100) + + channelsMap := channelPeer.Channels() + + sampleChannelInfo, exist := channelsMap[testdata.SampleChannel] + Expect(exist).To(BeTrue()) + Expect(sampleChannelInfo.Channel).To(Equal(testdata.SampleChannel)) + Expect(sampleChannelInfo.Height).To(Equal(testdata.SampleChannelHeight)) + + fabcarChannelInfo, exist := channelsMap[testdata.FabcarChannel] + Expect(exist).To(BeTrue()) + Expect(fabcarChannelInfo.Channel).To(Equal(testdata.FabcarChannel)) + Expect(fabcarChannelInfo.Height).To(Equal(testdata.FabcarChannelHeight)) + }) + + It("default channel peer, with channel matcher, exclude Fabcar", func() { + channelPeer, err := observer.NewChannelPeer(channelPeerFetcherMock, + observer.WithChannels([]observer.ChannelToMatch{{MatchPattern: testdata.SampleChannel}})) + Expect(err).To(BeNil()) + + channelPeer.Observe(ctx) + time.Sleep(time.Millisecond * 100) + + channelsMap := channelPeer.Channels() + + sampleChannelInfo, exist := channelsMap[testdata.SampleChannel] + Expect(exist).To(BeTrue()) + Expect(sampleChannelInfo.Channel).To(Equal(testdata.SampleChannel)) + Expect(sampleChannelInfo.Height).To(Equal(testdata.SampleChannelHeight)) + + fabcarChannelInfo, exist := channelsMap[testdata.FabcarChannel] + Expect(exist).To(BeFalse()) + Expect(fabcarChannelInfo).To(BeNil()) + }) +}) diff --git a/observer/channel_test.go b/observer/channel_test.go deleted file mode 100644 index b59e75f8..00000000 --- a/observer/channel_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package observer_test - -// import ( -// "context" -// "time" -// -// "github.com/s7techlab/hlf-sdk-go/api" -// sdkMocks "github.com/s7techlab/hlf-sdk-go/client/testing" -// "go.uber.org/zap" -// -// blocksubscriber "go.b2bchain.tech/explorer/observer" -// "go.b2bchain.tech/explorer/testdata" -//) -// -//var _ = Describe("BlockSubscriber", func() { -// Context("Block parsing", func() { -// var ( -// getChannelsMock api.ChannelsFetcher -// bdMock api.BlocksDeliverer -// -// mspID, mspHost string -// err error -// ) -// BeforeEach(func() { -// const closeChannelWhenAllRead = true -// -// bdMock, err = sdkMocks.NewBlocksDelivererMock("../testdata/blocks", closeChannelWhenAllRead) -// Expect(err).To(BeNil()) -// -// var channelNames []string -// for k := range testdata.TestChannels { -// channelNames = append(channelNames, k) -// } -// -// getChannelsMock = sdkMocks.NewChannelsFetcherMock(channelNames) -// }) -// -// Context("should subscribe to channels and parse blocks", func() { -// It("default configuration. no channel provided, no auto subscribe", func() { -// bs, err := blocksubscriber.NewPeerBlockSubscriber(bdMock, getChannelsMock, mspID, mspHost, zap.NewExample()) -// Expect(err).To(BeNil()) -// -// ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) -// defer cancel() -// -// parsedBlocks, err := bs.Start(ctx) -// Expect(err).To(BeNil()) -// -// totalBlocks := 0 -// for range parsedBlocks { -// totalBlocks++ -// } -// Expect(totalBlocks).To(Equal(0)) -// }) -// -// It("default configuration, with auto subscribe to all channels", func() { -// bs, err := blocksubscriber.NewPeerBlockSubscriber(bdMock, getChannelsMock, mspID, mspHost, zap.NewExample(), -// blocksubscriber.SetObserveNewChannels(true)) -// Expect(err).To(BeNil()) -// -// ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) -// defer cancel() -// -// parsedBlocks, err := bs.Start(ctx) -// Expect(err).To(BeNil()) -// -// totalBlocks := 0 -// for range parsedBlocks { -// totalBlocks++ -// } -// Expect(totalBlocks).To(Equal(13 + 9 + 20 + 16)) // 58 - total blocks in folder -// }) -// -// Context("test channel settings", func() { -// It("with one channel and disabled auto subscribe", func() { -// seekFromBlock := 10 -// channelName := "asset-transfer-basic" -// // basic channel have 13 blocks, we want to read from observer with index 10 -// bs, err := blocksubscriber.NewPeerBlockSubscriber( -// bdMock, -// getChannelsMock, -// mspID, mspHost, -// zap.NewExample(), -// blocksubscriber.WithChannelSetting(blocksubscriber.ChannelSetting{ -// NamePattern: channelName, -// FromBlock: uint64(seekFromBlock), -// }), -// ) -// Expect(err).To(BeNil()) -// -// ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) -// defer cancel() -// -// parsedBlocks, err := bs.Start(ctx) -// Expect(err).To(BeNil()) -// -// totalBlocks := 0 -// for v := range parsedBlocks { -// Expect(v.Block.Header.Number).To(Equal(uint64(seekFromBlock + totalBlocks))) -// Expect(v.ChannelName).To(Equal(channelName)) -// totalBlocks++ -// } -// Expect(totalBlocks).To(Equal(3)) -// }) -// -// It("with two configured channel(one with regex) and disabled auto subscribe", func() { -// bs, err := blocksubscriber.NewPeerBlockSubscriber( -// bdMock, -// getChannelsMock, -// mspID, mspHost, -// zap.NewExample(), -// blocksubscriber.WithChannelSetting( -// // expect 3 blocks from here -// blocksubscriber.ChannelSetting{ -// NamePattern: "asset-transfer-basic", -// // basic channel have 13 blocks, we want to read from observer with index 10 -// FromBlock: 10, -// }, -// // and in sbe(20) + secured-agreement(16) -// blocksubscriber.ChannelSetting{ -// NamePattern: "/^asset-transfer-s*/", -// FromBlock: 0, -// }), -// // 3+20+16=39 total blocks -// ) -// Expect(err).To(BeNil()) -// -// ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) -// defer cancel() -// -// parsedBlocks, err := bs.Start(ctx) -// Expect(err).To(BeNil()) -// -// totalBlocks := 0 -// for range parsedBlocks { -// totalBlocks++ -// } -// Expect(totalBlocks).To(Equal(39)) -// }) -// }) -// }) -// }) -//}) diff --git a/testdata/blocks/blocks.go b/testdata/blocks/blocks.go index 79f38a29..16cbc12b 100644 --- a/testdata/blocks/blocks.go +++ b/testdata/blocks/blocks.go @@ -1,6 +1,24 @@ package blocks -// +const ( + Path = "testdata/blocks/fixtures" + + SampleChannel = "sample-channel" + SampleChannelHeight uint64 = 9 + FabcarChannel = "fabcar-channel" + FabcarChannelHeight uint64 = 11 + + SampleChaincode = "sample" + FabcarChaincode = "fabcar" +) + +var ( + Channels = []string{SampleChannel, FabcarChannel} + ChannelsHeights = map[string]uint64{SampleChannel: SampleChannelHeight, FabcarChannel: FabcarChannelHeight} + + Chaincodes = []string{SampleChaincode, FabcarChaincode} +) + //import ( // "reflect" // "sort" diff --git a/testdata/blocks/db.go b/testdata/blocks/db.go deleted file mode 100644 index bf546834..00000000 --- a/testdata/blocks/db.go +++ /dev/null @@ -1,61 +0,0 @@ -package blocks - -import ( - "time" - - "github.com/s7techlab/hlf-sdk-go/observer/transform" -) - -const ( - syscc = "syscc" -) - -var chaincodes = [][]interface{}{ - {transform.LifecycleChaincodeName, syscc}, - //{FabcarChaincode, CCVersion}, - //{SampleChaincode, CCVersion}, -} - -var now = time.Now() - -var channels = [][]interface{}{ - //{FabcarChannel, now, "{}"}, - //{SampleChannel, now, "{}"}, -} - -//func InitDBForAPITests(db *sql.DB) error { -// -// for _, cc := range chaincodes { -// _, err := db.Exec(` -// insert into chaincode (id, version) -// values ($1, $2) -// `, cc...) -// if err != nil { -// return fmt.Errorf("add chaincode: %w", err) -// } -// } -// -// for _, c := range channels { -// _, err := db.Exec(` -// insert into channel (id, created_at, config_parsed) -// values ($1, $2, $3) -// `, c...) -// if err != nil { -// return fmt.Errorf("add channel: %w", err) -// } -// } -// -// for _, cc := range chaincodes { -// for _, c := range channels { -// _, err := db.Exec(` -// insert into channel_chaincodes (channel_id, chaincode_id, chaincode_version, created_at) -// values ($1, $2, $3, $4) -// `, c[0], cc[0], cc[1], now) -// if err != nil { -// return fmt.Errorf("add channel_chaincode: %w", err) -// } -// } -// } -// -// return nil -//}