diff --git a/go.mod b/go.mod index 87b341c..93ce6ea 100644 --- a/go.mod +++ b/go.mod @@ -18,9 +18,9 @@ require ( github.com/onsi/ginkgo v1.14.0 github.com/onsi/gomega v1.27.10 github.com/pkg/errors v0.9.1 - github.com/stretchr/testify v1.7.1-0.20210116013205-6990a05d54c2 + github.com/stretchr/testify v1.8.1 go.opencensus.io v0.22.0 - go.uber.org/zap v1.14.1 + go.uber.org/zap v1.26.0 golang.org/x/crypto v0.14.0 golang.org/x/sync v0.4.0 google.golang.org/genproto v0.0.0-20210122163508-8081c04a3579 @@ -168,7 +168,6 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.4.0 // indirect - github.com/stretchr/objx v0.2.0 // indirect github.com/sykesm/zap-logfmt v0.0.3 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect github.com/tedsuo/ifrit v0.0.0-20191009134036-9a97d0632f00 // indirect @@ -181,8 +180,7 @@ require ( github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.3 // indirect go.etcd.io/etcd v3.3.13+incompatible // indirect - go.uber.org/atomic v1.6.0 // indirect - go.uber.org/multierr v1.5.0 // indirect + go.uber.org/multierr v1.10.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum index b4d19d0..5fe18b5 100644 --- a/go.sum +++ b/go.sum @@ -546,15 +546,19 @@ github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= -github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1-0.20210116013205-6990a05d54c2 h1:oevpAKCW58ZYJe1hqfgLqg+1zXmYrQ9xf7HLUdfS+qM= github.com/stretchr/testify v1.7.1-0.20210116013205-6990a05d54c2/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/sykesm/zap-logfmt v0.0.2/go.mod h1:TerDJT124HaO8UTpZ2wJCipJRAKQ9XONM1mzUabIh6M= github.com/sykesm/zap-logfmt v0.0.3 h1:3Wrhf7+I9JEUD8B6KPtDAr9j2jrS0/EPLy7GCE1t/+U= github.com/sykesm/zap-logfmt v0.0.3/go.mod h1:AuBd9xQjAe3URrWT1BBDk2v2onAZHkZkWRMiYZXiZWA= @@ -602,19 +606,20 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= -go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= -go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -637,7 +642,6 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -842,7 +846,6 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed h1:WX1yoOaKQfddO/mLzdV4wptyWgoH/6hwLs7QHTixo0I= mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed/go.mod h1:Xkxe497xwlCKkIaQYRfC7CSLworTXY9RMqwhhCm+8Nc= diff --git a/observer/block_peer_common.go b/observer/block_peer_common.go index 1893808..f46de51 100644 --- a/observer/block_peer_common.go +++ b/observer/block_peer_common.go @@ -178,14 +178,19 @@ func (bp *BlockPeer) Stop() { } func (bp *BlockPeer) initChannels(ctx context.Context) { - bp.mu.RLock() - defer bp.mu.RUnlock() - for channel := range bp.peerChannels.Channels() { - if _, ok := bp.channelObservers[channel]; !ok { + bp.mu.RLock() + _, ok := bp.channelObservers[channel] + bp.mu.RUnlock() + + if !ok { bp.logger.Info(`add channel observer`, zap.String(`channel`, channel)) - bp.channelObservers[channel] = bp.peerChannel(ctx, channel) + blockPeerChannel := bp.peerChannel(ctx, channel) + + bp.mu.Lock() + bp.channelObservers[channel] = blockPeerChannel + bp.mu.Unlock() } } } @@ -193,9 +198,9 @@ func (bp *BlockPeer) initChannels(ctx context.Context) { func (bp *BlockPeer) getSeekFrom(channel string) SeekFromFetcher { seekFrom := ChannelSeekOldest() // at first check seekFrom var, if it is empty, check seekFromFetcher - bp.mu.Lock() + bp.mu.RLock() seekFromNum, exist := bp.seekFrom[channel] - bp.mu.Unlock() + bp.mu.RUnlock() if exist { seekFrom = ChannelSeekFrom(seekFromNum - 1) } else { diff --git a/observer/block_peer_common_concurrently.go b/observer/block_peer_common_concurrently.go index 94ca646..4cb1192 100644 --- a/observer/block_peer_common_concurrently.go +++ b/observer/block_peer_common_concurrently.go @@ -51,14 +51,19 @@ func (bp *BlockPeer) ObserveByChannels(ctx context.Context) *BlocksByChannels { } func (bp *BlockPeer) initChannelsConcurrently(ctx context.Context, blocksByChannels *BlocksByChannels) { - bp.mu.Lock() - defer bp.mu.Unlock() - for channel := range bp.peerChannels.Channels() { - if _, ok := bp.channelObservers[channel]; !ok { + bp.mu.RLock() + _, ok := bp.channelObservers[channel] + bp.mu.RUnlock() + + if !ok { bp.logger.Info(`add channel observer concurrently`, zap.String(`channel`, channel)) - bp.channelObservers[channel] = bp.peerChannelConcurrently(ctx, channel, blocksByChannels) + blockPeerChannel := bp.peerChannelConcurrently(ctx, channel, blocksByChannels) + + bp.mu.Lock() + bp.channelObservers[channel] = blockPeerChannel + bp.mu.Unlock() } } } diff --git a/observer/block_peer_parsed.go b/observer/block_peer_parsed.go index 3614db0..704b32a 100644 --- a/observer/block_peer_parsed.go +++ b/observer/block_peer_parsed.go @@ -131,14 +131,19 @@ func (pbp *ParsedBlockPeer) Stop() { } func (pbp *ParsedBlockPeer) initParsedChannels(ctx context.Context) { - pbp.mu.RLock() - defer pbp.mu.RUnlock() - for channel := range pbp.blockPeer.peerChannels.Channels() { - if _, ok := pbp.parsedChannelObservers[channel]; !ok { + pbp.mu.RLock() + _, ok := pbp.parsedChannelObservers[channel] + pbp.mu.RUnlock() + + if !ok { pbp.blockPeer.logger.Info(`add parsed channel observer`, zap.String(`channel`, channel)) - pbp.parsedChannelObservers[channel] = pbp.peerParsedChannel(ctx, channel) + parsedBlockPeerChannel := pbp.peerParsedChannel(ctx, channel) + + pbp.mu.Lock() + pbp.parsedChannelObservers[channel] = parsedBlockPeerChannel + pbp.mu.Unlock() } } } diff --git a/observer/block_peer_parsed_concurrently.go b/observer/block_peer_parsed_concurrently.go index 5d530c7..4311f66 100644 --- a/observer/block_peer_parsed_concurrently.go +++ b/observer/block_peer_parsed_concurrently.go @@ -53,14 +53,19 @@ func (pbp *ParsedBlockPeer) ObserveByChannels(ctx context.Context) *ParsedBlocks } func (pbp *ParsedBlockPeer) initParsedChannelsConcurrently(ctx context.Context, blocksByChannels *ParsedBlocksByChannels) { - pbp.mu.Lock() - defer pbp.mu.Unlock() - for channel := range pbp.blockPeer.peerChannels.Channels() { - if _, ok := pbp.parsedChannelObservers[channel]; !ok { + pbp.mu.RLock() + _, ok := pbp.parsedChannelObservers[channel] + pbp.mu.RUnlock() + + if !ok { pbp.blockPeer.logger.Info(`add parsed channel observer concurrently`, zap.String(`channel`, channel)) - pbp.parsedChannelObservers[channel] = pbp.peerParsedChannelConcurrently(ctx, channel, blocksByChannels) + parsedBlockPeerChannel := pbp.peerParsedChannelConcurrently(ctx, channel, blocksByChannels) + + pbp.mu.Lock() + pbp.parsedChannelObservers[channel] = parsedBlockPeerChannel + pbp.mu.Unlock() } } }