Skip to content

Commit

Permalink
Distance tracker reuses libp2p host (#51)
Browse files Browse the repository at this point in the history
Creating a new libp2p host for each distance update is not efficient. Failing to shut down the host adter each update is leaking memory.

The distance tracker reuses the same p2phost, and the client closes the host if it was not given an existing host.
  • Loading branch information
gammazero authored Aug 2, 2023
1 parent ad89151 commit 7d29b9c
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 44 deletions.
33 changes: 23 additions & 10 deletions pkg/adpub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type client struct {
// adSel is the selector for a single advertisement.
adSel ipld.Node

host host.Host
topic string
host host.Host
ownsHost bool
topic string
}

var ErrContentNotFound = errors.New("content not found at publisher")
Expand All @@ -59,14 +60,19 @@ func NewClient(addrInfo peer.AddrInfo, options ...Option) (Client, error) {
return nil, err
}

h, err := libp2p.New()
if err != nil {
return nil, err
var ownsHost bool
if opts.p2pHost == nil {
opts.p2pHost, err = libp2p.New()
if err != nil {
return nil, err
}
ownsHost = true
}
h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, time.Hour)

opts.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, time.Hour)

store := newClientStore()
sub, err := dagsync.NewSubscriber(h, store.Batching, store.LinkSystem, opts.topic)
sub, err := dagsync.NewSubscriber(opts.p2pHost, store.Batching, store.LinkSystem, opts.topic)
if err != nil {
return nil, err
}
Expand All @@ -88,8 +94,9 @@ func NewClient(addrInfo peer.AddrInfo, options ...Option) (Client, error) {
store: store,
adSel: adSel,

host: h,
topic: opts.topic,
host: opts.p2pHost,
ownsHost: ownsHost,
topic: opts.topic,
}, nil
}

Expand Down Expand Up @@ -267,5 +274,11 @@ func (c *client) findNextMissingChunkLink(ctx context.Context, next cid.Cid) (ci
}

func (c *client) Close() error {
return c.sub.Close()
err := c.sub.Close()
if c.ownsHost {
if err := c.host.Close(); err != nil {
return err
}
}
return err
}
11 changes: 11 additions & 0 deletions pkg/adpub/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/host"
)

const (
Expand All @@ -14,6 +16,7 @@ const (
type config struct {
adChainDepthLimit int64
entriesDepthLimit int64
p2pHost host.Host
maxSyncRetry uint64
syncRetryBackoff time.Duration
topic string
Expand Down Expand Up @@ -69,6 +72,14 @@ func WithMaxSyncRetry(r uint64) Option {
}
}

// WithLibp2pHost configures the client to use an existing libp2p host.
func WithLibp2pHost(h host.Host) Option {
return func(c *config) error {
c.p2pHost = h
return nil
}
}

// WithTopicName sets the topic name on which the provider announces advertised
// content. Defaults to '/indexer/ingest/mainnet'.
func WithTopicName(topic string) Option {
Expand Down
95 changes: 63 additions & 32 deletions pkg/dtrack/distance_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipni/go-libipni/pcache"
"github.com/ipni/ipni-cli/pkg/adpub"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -34,25 +36,54 @@ type distTrack struct {
errType int
}

func RunDistanceTracker(ctx context.Context, include, exclude map[peer.ID]struct{}, provCache *pcache.ProviderCache, depthLimit int64, updateIn, timeout time.Duration) <-chan DistanceUpdate {
type tracker struct {
p2pHost host.Host
include map[peer.ID]struct{}
exclude map[peer.ID]struct{}
pcache *pcache.ProviderCache
depthLimit int64
updateIn time.Duration
timeout time.Duration
updates chan<- DistanceUpdate
}

func RunDistanceTracker(ctx context.Context, include, exclude map[peer.ID]struct{}, provCache *pcache.ProviderCache, depthLimit int64, updateIn, timeout time.Duration) (<-chan DistanceUpdate, error) {
p2pHost, err := libp2p.New()
if err != nil {
return nil, err
}

updates := make(chan DistanceUpdate)
go runTracker(ctx, include, exclude, provCache, updateIn, timeout, depthLimit, updates)

return updates
tkr := &tracker{
p2pHost: p2pHost,
include: include,
exclude: exclude,
pcache: provCache,
depthLimit: depthLimit,
updateIn: updateIn,
timeout: timeout,
updates: updates,
}

go tkr.run(ctx)

return updates, nil
}

func runTracker(ctx context.Context, include, exclude map[peer.ID]struct{}, provCache *pcache.ProviderCache, updateIn, timeout time.Duration, depthLimit int64, updates chan<- DistanceUpdate) {
defer close(updates)
func (tkr *tracker) run(ctx context.Context) {
defer close(tkr.updates)
defer tkr.p2pHost.Close()

var lookForNew bool
var tracks map[peer.ID]*distTrack
if len(include) == 0 {
if len(tkr.include) == 0 {
lookForNew = true
tracks = make(map[peer.ID]*distTrack)
} else {
tracks = make(map[peer.ID]*distTrack, len(include))
for pid := range include {
if _, ok := exclude[pid]; ok {
tracks = make(map[peer.ID]*distTrack, len(tkr.include))
for pid := range tkr.include {
if _, ok := tkr.exclude[pid]; ok {
continue
}
tracks[pid] = &distTrack{}
Expand All @@ -65,41 +96,41 @@ func runTracker(ctx context.Context, include, exclude map[peer.ID]struct{}, prov
for {
select {
case <-timer.C:
if err := provCache.Refresh(ctx); err != nil {
if err := tkr.pcache.Refresh(ctx); err != nil {
return
}
if lookForNew {
for _, pinfo := range provCache.List() {
for _, pinfo := range tkr.pcache.List() {
pid := pinfo.AddrInfo.ID
if _, ok := tracks[pid]; !ok {
if _, ok = exclude[pid]; !ok {
if _, ok = tkr.exclude[pid]; !ok {
tracks[pid] = &distTrack{}
}
}
}
}
updateTracks(ctx, provCache, tracks, timeout, depthLimit, updates)
timer.Reset(updateIn)
tkr.updateTracks(ctx, tracks)
timer.Reset(tkr.updateIn)
case <-ctx.Done():
return
}
}
}

func updateTracks(ctx context.Context, provCache *pcache.ProviderCache, tracks map[peer.ID]*distTrack, timeout time.Duration, depthLimit int64, updates chan<- DistanceUpdate) {
func (tkr *tracker) updateTracks(ctx context.Context, tracks map[peer.ID]*distTrack) {
for providerID, track := range tracks {
updateTrack(ctx, providerID, track, provCache, timeout, depthLimit, updates)
tkr.updateTrack(ctx, providerID, track)
}
}

func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *pcache.ProviderCache, timeout time.Duration, depthLimit int64, updates chan<- DistanceUpdate) {
if timeout != 0 {
func (tkr *tracker) updateTrack(ctx context.Context, pid peer.ID, track *distTrack) {
if tkr.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
ctx, cancel = context.WithTimeout(ctx, tkr.timeout)
defer cancel()
}

pinfo, err := provCache.Get(ctx, pid)
pinfo, err := tkr.pcache.Get(ctx, pid)
if err != nil {
return
}
Expand All @@ -108,7 +139,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if track.errType != errTypeNotFound {
track.errType = errTypeNotFound
track.err = fmt.Errorf("provider info not found")
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Err: track.err,
}
Expand All @@ -120,7 +151,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if track.errType != errTypeNoSync {
track.errType = errTypeNoSync
track.err = fmt.Errorf("provider never synced")
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Err: track.err,
}
Expand All @@ -132,20 +163,20 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if track.errType != errTypeNoPublisher {
track.errType = errTypeNoPublisher
track.err = fmt.Errorf("no advertisement publisher")
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Err: track.err,
}
}
return
}

pubClient, err := adpub.NewClient(*pinfo.Publisher, adpub.WithAdChainDepthLimit(depthLimit))
pubClient, err := adpub.NewClient(*pinfo.Publisher, adpub.WithAdChainDepthLimit(tkr.depthLimit), adpub.WithLibp2pHost(tkr.p2pHost))
if err != nil {
if track.errType != errTypePubClient {
track.errType = errTypePubClient
track.err = fmt.Errorf("cannot create publisher client: %w", err)
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Err: track.err,
}
Expand All @@ -160,7 +191,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if track.errType != errTypeUpdate {
track.errType = errTypeUpdate
track.err = fmt.Errorf("cannot get distance update: %w", err)
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Err: track.err,
}
Expand All @@ -174,7 +205,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if dist != -1 {
track.head = head
}
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Distance: dist,
}
Expand All @@ -189,7 +220,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if track.errType != errTypeUpdate {
track.errType = errTypeUpdate
track.err = fmt.Errorf("cannot get distance update: %w", err)
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Err: track.err,
}
Expand All @@ -201,7 +232,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if dist == -1 {
track.dist = -1
track.head = cid.Undef
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Distance: -1,
}
Expand All @@ -220,7 +251,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if track.errType != errTypeUpdate {
track.errType = errTypeUpdate
track.err = fmt.Errorf("cannot get distance update: %w", err)
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Err: track.err,
}
Expand All @@ -232,7 +263,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
if dist == -1 {
track.dist = -1
track.head = cid.Undef
updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Distance: -1,
}
Expand All @@ -247,7 +278,7 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
return
}

updates <- DistanceUpdate{
tkr.updates <- DistanceUpdate{
ID: pid,
Distance: track.dist,
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ func followDistance(cctx *cli.Context, include, exclude map[peer.ID]struct{}, pc

fmt.Fprintln(os.Stderr, "Showing provider distance updates, ctrl-c to cancel...")
limit := cctx.Int64("ad-depth-limit")
updates := dtrack.RunDistanceTracker(cctx.Context, include, exclude, pc, limit, trackUpdateIn, timeout)
updates, err := dtrack.RunDistanceTracker(cctx.Context, include, exclude, pc, limit, trackUpdateIn, timeout)
if err != nil {
return err
}
for update := range updates {
if update.Err != nil {
fmt.Fprintln(os.Stderr, "Provider", update.ID, "distance error:", update.Err)
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.0.10"
"version": "v0.0.11"
}

0 comments on commit 7d29b9c

Please sign in to comment.