Skip to content

Commit

Permalink
disable-observer-parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Neznaemov committed Sep 28, 2023
1 parent 432df04 commit a31b805
Show file tree
Hide file tree
Showing 16 changed files with 40 additions and 939 deletions.
28 changes: 22 additions & 6 deletions observer/block_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type (
configBlock *common.Block

blocks chan *Block
// turn off parsing common block to hlfproto block
disableParsing bool

isWork bool
cancelObserve context.CancelFunc
Expand All @@ -34,6 +36,9 @@ type (
transformers []BlockTransformer
configBlock *common.Block

// turn off parsing common block to hlfproto block
disableParsing bool

// don't recreate stream if it has not any blocks
stopRecreateStream bool
}
Expand Down Expand Up @@ -65,6 +70,12 @@ func WithChannelStopRecreateStream(stop bool) BlockChannelOpt {
}
}

func WithChannelBlockDisableParsing(disableParsing bool) BlockChannelOpt {
return func(opts *BlockChannelOpts) {
opts.disableParsing = disableParsing
}
}

var DefaultBlockChannelOpts = &BlockChannelOpts{
createStreamWithRetry: CreateBlockStreamWithRetryDelay(DefaultConnectRetryDelay),
transformers: nil, // no transformers
Expand All @@ -89,6 +100,7 @@ func NewBlockChannel(channel string, blocksDeliver api.BlocksDeliverer, seekFrom
createStreamWithRetry: blockChannelOpts.createStreamWithRetry,
transformers: blockChannelOpts.transformers,
stopRecreateStream: blockChannelOpts.stopRecreateStream,
disableParsing: blockChannelOpts.disableParsing,
}

return observer
Expand All @@ -102,7 +114,7 @@ func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error) {
return c.blocks, nil
}

// ctxObserve using for nested controll process without stopped primary context
// ctxObserve using for nested control process without stopped primary context
ctxObserve, cancel := context.WithCancel(ctx)
c.cancelObserve = cancel

Expand Down Expand Up @@ -148,13 +160,17 @@ func (c *BlockChannel) Observe(ctx context.Context) (<-chan *Block, error) {
}

block := &Block{
Channel: c.channel,
Channel: c.channel,
CommonBlock: incomingBlock,
}
block.Block, block.Error = hlfproto.ParseBlock(incomingBlock, hlfproto.WithConfigBlock(c.configBlock))

for pos, transformer := range c.transformers {
if err = transformer.Transform(block); err != nil {
c.logger.Warn(`transformer`, zap.Int(`pos`, pos), zap.Error(err))
if c.disableParsing {
block.Block, block.Error = hlfproto.ParseBlock(incomingBlock, hlfproto.WithConfigBlock(c.configBlock))

for pos, transformer := range c.transformers {
if err = transformer.Transform(block); err != nil {
c.logger.Warn(`transformer`, zap.Int(`pos`, pos), zap.Error(err))
}
}
}

Expand Down
20 changes: 17 additions & 3 deletions observer/block_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type (

blocks chan *Block
blocksByChannels map[string]chan *Block
// turn off parsing common block to hlfproto block
disableParsing bool

isWork bool
cancelObserve context.CancelFunc
Expand All @@ -40,9 +42,11 @@ type (
}

BlockPeerOpts struct {
transformers []BlockTransformer
seekFrom map[string]uint64
configBlocks map[string]*common.Block
transformers []BlockTransformer
seekFrom map[string]uint64
configBlocks map[string]*common.Block
// turn off parsing common block to hlfproto block
disableParsing bool
observePeriod time.Duration
stopRecreateStream bool
logger *zap.Logger
Expand Down Expand Up @@ -94,6 +98,14 @@ func WithBlockPeerObservePeriod(observePeriod time.Duration) BlockPeerOpt {
}
}

func WithBlockPeerDisableParsing(disableParsing bool) BlockPeerOpt {
return func(opts *BlockPeerOpts) {
if disableParsing {
opts.disableParsing = disableParsing
}
}
}

func WithBlockStopRecreateStream(stop bool) BlockPeerOpt {
return func(opts *BlockPeerOpts) {
opts.stopRecreateStream = stop
Expand All @@ -115,6 +127,7 @@ func NewBlockPeer(peerChannels PeerChannels, blockDeliverer api.BlocksDeliverer,
transformers: blockPeerOpts.transformers,
seekFrom: blockPeerOpts.seekFrom,
configBlocks: blockPeerOpts.configBlocks,
disableParsing: blockPeerOpts.disableParsing,
observePeriod: blockPeerOpts.observePeriod,
stopRecreateStream: blockPeerOpts.stopRecreateStream,
logger: blockPeerOpts.logger,
Expand Down Expand Up @@ -212,6 +225,7 @@ func (bp *BlockPeer) peerChannel(ctx context.Context, channel string) *blockPeer
ChannelSeekFrom(seekFrom),
WithChannelBlockTransformers(bp.transformers),
WithChannelConfigBlock(configBlock),
WithChannelBlockDisableParsing(bp.disableParsing),
WithChannelBlockLogger(bp.logger),
WithChannelStopRecreateStream(bp.stopRecreateStream))

Expand Down
128 changes: 0 additions & 128 deletions observer/block_peer_test.go

This file was deleted.

1 change: 1 addition & 0 deletions observer/block_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type (
BlockOriginal *hlfproto.Block // here is original block before transformation if it is, otherwise it's nil
Channel string
Error error
CommonBlock *common.Block
}

CreateBlockStream func(context.Context) (<-chan *common.Block, error)
Expand Down
60 changes: 0 additions & 60 deletions observer/channel_peer_test.go

This file was deleted.

Empty file modified observer/observer.test
100755 → 100644
Empty file.
13 changes: 0 additions & 13 deletions observer/suite_test.go

This file was deleted.

Loading

0 comments on commit a31b805

Please sign in to comment.