-
Notifications
You must be signed in to change notification settings - Fork 526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Ingester] Create one goroutine per tenant to flush traces to disk #4483
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
Signed-off-by: Joe Elliott <number101010@gmail.com>
// Flush triggers a flush of all in memory traces to disk. This is called | ||
// by the lifecycler on shutdown and will put our traces in the WAL to be | ||
// replayed. | ||
func (i *Ingester) Flush() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the old way that we flushed traces to disk on shutdown. it was difficult to find and driven through an obsolete ring mechanic "FlushTransfer" so I removed it and moved it to "stopping". I think the new way is more easily discoverable and clear.
i.cutOneInstanceToWal(instance, true) | ||
} else { | ||
i.sweepAllInstances(true) | ||
i.cutAllInstancesToWal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed these funcs for clarity
func (i *Ingester) sweepAllInstances(immediate bool) { | ||
// cutToWalLoop kicks off a goroutine for the passed instance that will periodically cut traces to WAL. | ||
// it signals completion through cutToWalWg, waits for cutToWalStart and stops on cutToWalStop. | ||
func (i *Ingester) cutToWalLoop(instance *instance) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the new, per tenant loop, that drives flushing live traces to disk
@@ -256,7 +275,6 @@ func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, | |||
} | |||
|
|||
start := time.Now() | |||
level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was being logged twice and the the first was actually wrong. fixed and removed the second log
} | ||
|
||
i.pushErr.Store(ErrStarting) | ||
|
||
i.local = store.WAL().LocalBackend() | ||
|
||
lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg)) | ||
lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, nil, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer register ourselves as a "FlushTransferer"
https://github.com/grafana/dskit/blob/main/ring/lifecycler.go#L181
This is deprecated logic that we were only using to drive flush to disk on shutdown behavior. Removed in favor of just doing it clearly in the stopping func.
@@ -351,19 +362,6 @@ func (i *Ingester) getInstances() []*instance { | |||
return instances | |||
} | |||
|
|||
// stopIncomingRequests implements ring.Lifecycler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only called in one spot so removed
i.pushErr.Store(ErrShuttingDown) | ||
} | ||
|
||
// TransferOut implements ring.Lifecycler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only existed to satisfy the FlushTransferer interface
Signed-off-by: Joe Elliott <number101010@gmail.com>
What this PR does:
We have identified a failure mode in the ingesters due to using a single goroutine to flush all live traces to disk. If there is a heavy query or other event that causes resource starvation this goroutine will fall behind and an ingester's memory will balloon and it will start refusing traces with a LIVE_TRACES_EXCEEDED error. In an extreme case it will OOM.
This PR creates one goroutine per tenant that manages the flushing of traces to disk for that tenant. This is similar to the local blocks in the metrics generator. I would have preferred to have the goroutine lifecycle managed by the instance itself, but that would be counter to the way things are currently designed and would have required more changes.
Other Change
At one point I was having issues tracking down a bug in flushing blocks. I attempted to use the local jsonnet example to debug but it was out of date. So that is also patched up.
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]