Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingester] Create one goroutine per tenant to flush traces to disk #4483

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

joe-elliott
Copy link
Member

What this PR does:
We have identified a failure mode in the ingesters due to using a single goroutine to flush all live traces to disk. If there is a heavy query or other event that causes resource starvation this goroutine will fall behind and an ingester's memory will balloon and it will start refusing traces with a LIVE_TRACES_EXCEEDED error. In an extreme case it will OOM.

This PR creates one goroutine per tenant that manages the flushing of traces to disk for that tenant. This is similar to the local blocks in the metrics generator. I would have preferred to have the goroutine lifecycle managed by the instance itself, but that would be counter to the way things are currently designed and would have required more changes.

Other Change
At one point I was having issues tracking down a bug in flushing blocks. I attempted to use the local jsonnet example to debug but it was out of date. So that is also patched up.

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
// Flush triggers a flush of all in memory traces to disk. This is called
// by the lifecycler on shutdown and will put our traces in the WAL to be
// replayed.
func (i *Ingester) Flush() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the old way that we flushed traces to disk on shutdown. it was difficult to find and driven through an obsolete ring mechanic "FlushTransfer" so I removed it and moved it to "stopping". I think the new way is more easily discoverable and clear.

https://github.com/grafana/tempo/pull/4483/files#diff-b17cd433ae9859f67f0056e452237ddf70a44e9821c6128c8990005eaf7decd1R176

Comment on lines +113 to +115
i.cutOneInstanceToWal(instance, true)
} else {
i.sweepAllInstances(true)
i.cutAllInstancesToWal()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed these funcs for clarity

func (i *Ingester) sweepAllInstances(immediate bool) {
// cutToWalLoop kicks off a goroutine for the passed instance that will periodically cut traces to WAL.
// it signals completion through cutToWalWg, waits for cutToWalStart and stops on cutToWalStop.
func (i *Ingester) cutToWalLoop(instance *instance) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new, per tenant loop, that drives flushing live traces to disk

@@ -256,7 +275,6 @@ func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool,
}

start := time.Now()
level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was being logged twice and the the first was actually wrong. fixed and removed the second log

}

i.pushErr.Store(ErrStarting)

i.local = store.WAL().LocalBackend()

lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, nil, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer register ourselves as a "FlushTransferer"

https://github.com/grafana/dskit/blob/main/ring/lifecycler.go#L181

This is deprecated logic that we were only using to drive flush to disk on shutdown behavior. Removed in favor of just doing it clearly in the stopping func.

@@ -351,19 +362,6 @@ func (i *Ingester) getInstances() []*instance {
return instances
}

// stopIncomingRequests implements ring.Lifecycler.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only called in one spot so removed

i.pushErr.Store(ErrShuttingDown)
}

// TransferOut implements ring.Lifecycler.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only existed to satisfy the FlushTransferer interface

Signed-off-by: Joe Elliott <number101010@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant