Skip to content

Commit

Permalink
Debug: write memory profile if heap exceeds threshold (#819)
Browse files Browse the repository at this point in the history
This PR adds adds a debugging flag to periodically check memory usage against a
threshold. If it exceeds the threshold, then a memory profile like
`indexmemory.prof.1` is written to disk. No more than 10 profiles will be
written.

I've already found this more useful than the existing `-memprofile` flag, so I
removed that. It's hard to get insights using that flag, since it only takes a
single profile per shard, forces GC, and forces parallelism to 1.
  • Loading branch information
jtibshirani authored Sep 24, 2024
1 parent a6e3dc2 commit aae71e5
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 34 deletions.
76 changes: 48 additions & 28 deletions build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"time"

"github.com/bmatcuk/doublestar"
"github.com/dustin/go-humanize"
"github.com/go-enry/go-enry/v2"
"github.com/rs/xid"

Expand Down Expand Up @@ -88,9 +89,6 @@ type Options struct {
// If set, ctags must succeed.
CTagsMustSucceed bool

// Write memory profiles to this file.
MemProfile string

// LargeFiles is a slice of glob patterns, including ** for any number
// of directories, where matching file paths should be indexed
// regardless of their size. The full pattern syntax is here:
Expand Down Expand Up @@ -120,6 +118,12 @@ type Options struct {
// ShardMerging is true if builder should respect compound shards. This is a
// Sourcegraph specific option.
ShardMerging bool

// HeapProfileTriggerBytes is the heap usage in bytes that will trigger a memory profile. If 0, no memory profile will be triggered.
// Profiles will be written to files named `index-memory.prof.n` in the index directory. No more than 10 files are written.
//
// Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile.
HeapProfileTriggerBytes uint64
}

// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
Expand Down Expand Up @@ -194,7 +198,6 @@ func (o *Options) Flags(fs *flag.FlagSet) {
fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices")
fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.")
fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.")
fs.StringVar(&o.MemProfile, "memprofile", "", "write memory profile(s) to `file.shardnum`. Note: sets parallelism to 1.")

// Sourcegraph specific
fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.")
Expand Down Expand Up @@ -270,6 +273,10 @@ type Builder struct {
// indexTime is set by tests for doing reproducible builds.
indexTime time.Time

// heapProfileMu is used to ensure that only one memory profile is written at a time
heapProfileMu sync.Mutex
heapProfileNum int

// a sortable 20 chars long id.
id string

Expand Down Expand Up @@ -835,7 +842,7 @@ func (b *Builder) flush() error {
shard := b.nextShardNum
b.nextShardNum++

if b.opts.Parallelism > 1 && b.opts.MemProfile == "" {
if b.opts.Parallelism > 1 {
b.building.Add(1)
b.throttle <- 1
go func() {
Expand All @@ -860,35 +867,13 @@ func (b *Builder) flush() error {
if err == nil {
b.finishedShards[done.temp] = done.final
}
if b.opts.MemProfile != "" {
// drop memory, and profile.
todo = nil
b.writeMemProfile(b.opts.MemProfile)
}

return b.buildError
}

return nil
}

var profileNumber int

func (b *Builder) writeMemProfile(name string) {
nm := fmt.Sprintf("%s.%d", name, profileNumber)
profileNumber++
f, err := os.Create(nm)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
f.Close()
log.Printf("wrote mem profile %q", nm)
}

// map [0,inf) to [0,1) monotonically
func squashRange(j int) float64 {
x := float64(j)
Expand Down Expand Up @@ -1011,15 +996,50 @@ func (b *Builder) buildShard(todo []*zoekt.Document, nextShardNum int) (*finishe

sortDocuments(todo)

for _, t := range todo {
for idx, t := range todo {
if err := shardBuilder.Add(*t); err != nil {
return nil, err
}

if idx%10_000 == 0 {
b.CheckMemoryUsage()
}
}

return b.writeShard(name, shardBuilder)
}

// CheckMemoryUsage checks the memory usage of the process and writes a memory profile if the heap usage exceeds the
// configured threshold. NOTE: this method is expensive and should only be used for debugging.
func (b *Builder) CheckMemoryUsage() {
// Don't check memory if heap profiling is disabled, or we've already written 10 profiles
if b.opts.HeapProfileTriggerBytes <= 0 || b.heapProfileNum >= 10 {
return
}

var m runtime.MemStats
runtime.ReadMemStats(&m)

if m.HeapAlloc > b.opts.HeapProfileTriggerBytes && b.heapProfileMu.TryLock() {
defer b.heapProfileMu.Unlock()

log.Printf("writing memory profile, heap usage: %s", humanize.Bytes(m.HeapAlloc))
name := filepath.Join(b.opts.IndexDir, fmt.Sprintf("indexmemory.prof.%d", b.heapProfileNum))
f, err := os.Create(name)
if err != nil {
log.Printf("failed to create memory profile file: %v", err)
return
}

err = pprof.WriteHeapProfile(f)
if err != nil {
log.Printf("failed to write memory profile: %v", err)
}

b.heapProfileNum++
}
}

func (b *Builder) newShardBuilder() (*zoekt.IndexBuilder, error) {
desc := b.opts.RepositoryDescription
desc.HasSymbols = !b.opts.DisableCTags && b.opts.CTagsPath != ""
Expand Down
19 changes: 15 additions & 4 deletions cmd/zoekt-git-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime/pprof"
"strings"

"github.com/dustin/go-humanize"
"github.com/sourcegraph/zoekt/internal/profiler"
"go.uber.org/automaxprocs/maxprocs"

Expand All @@ -31,8 +32,6 @@ import (
)

func run() int {
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")

allowMissing := flag.Bool("allow_missing_branches", false, "allow missing branches.")
submodules := flag.Bool("submodules", true, "if set to false, do not recurse into submodules")
branchesStr := flag.String("branches", "HEAD", "git branches to index.")
Expand All @@ -47,13 +46,16 @@ func run() int {
offlineRanking := flag.String("offline_ranking", "", "the name of the file that contains the ranking info.")
offlineRankingVersion := flag.String("offline_ranking_version", "", "a version string identifying the contents in offline_ranking.")
languageMap := flag.String("language_map", "", "a mapping between a language and its ctags processor (a:0,b:3).")

cpuProfile := flag.String("cpuprofile", "", "write cpu profile to `file`")

flag.Parse()

// Tune GOMAXPROCS to match Linux container CPU quota.
_, _ = maxprocs.Set()

if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if *cpuProfile != "" {
f, err := os.Create(*cpuProfile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
Expand Down Expand Up @@ -109,6 +111,15 @@ func run() int {
opts.LanguageMap[m[0]] = ctags.StringToParser(m[1])
}

if heapProfileTrigger := os.Getenv("ZOEKT_HEAP_PROFILE_TRIGGER"); heapProfileTrigger != "" {
trigger, err := humanize.ParseBytes(heapProfileTrigger)
if err != nil {
log.Printf("invalid value for ZOEKT_HEAP_PROFILE_TRIGGER: %v", err)
} else {
opts.HeapProfileTriggerBytes = trigger
}
}

profiler.Init("zoekt-git-index")
exitStatus := 0
for dir, name := range gitRepos {
Expand Down
10 changes: 8 additions & 2 deletions gitindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) {
return false, fmt.Errorf("build.NewBuilder: %w", err)
}

// Preparing the build can consume substantial memory, so check usage before starting to index.
builder.CheckMemoryUsage()

var ranks repoPathRanks
var meanRank float64
if opts.BuildOptions.DocumentRanksPath != "" {
Expand Down Expand Up @@ -562,7 +565,7 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) {
names = uniq(names)

log.Printf("attempting to index %d total files", totalFiles)
for _, name := range names {
for idx, name := range names {
keys := fileKeys[name]

for _, key := range keys {
Expand All @@ -574,9 +577,12 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) {
if err := builder.Add(doc); err != nil {
return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err)
}

if idx%10_000 == 0 {
builder.CheckMemoryUsage()
}
}
}

return true, builder.Finish()
}

Expand Down

0 comments on commit aae71e5

Please sign in to comment.