Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record stats inside indexing batch loop #83

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type":
type Stats struct {
Indexed int64 // total number of documents indexed
Deleted int64 // total number of documents deleted
Elapsed time.Duration // total time spent actually indexing
Elapsed time.Duration // total time spent actually indexing (excludes poll delay)
}

// Indexer is base interface for indexers
Expand Down Expand Up @@ -84,8 +84,8 @@ func (i *baseIndexer) log() *slog.Logger {
return slog.With("indexer", i.name)
}

// records a complete index and updates statistics
func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) {
// records indexing activity and updates statistics
func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) {
i.stats.Indexed += int64(indexed)
i.stats.Deleted += int64(deleted)
i.stats.Elapsed += elapsed
Expand Down
21 changes: 10 additions & 11 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@
i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified)

// now index our docs
start := time.Now()
created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
if err != nil {
return "", fmt.Errorf("error indexing documents: %w", err)
}

i.recordComplete(created+updated, deleted, time.Since(start))

// if the index didn't previously exist or we are rebuilding, remap to our alias
if remapAlias {
err := i.updateAlias(physicalIndex)
Expand Down Expand Up @@ -153,7 +150,7 @@
`

// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) {
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error {
totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0

var modifiedOn time.Time
Expand Down Expand Up @@ -193,17 +190,17 @@

// no more rows? return
if err == sql.ErrNoRows {
return 0, 0, 0, nil
return nil

Check warning on line 193 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L193

Added line #L193 was not covered by tests
}
if err != nil {
return 0, 0, 0, err
return err

Check warning on line 196 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L196

Added line #L196 was not covered by tests
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
if err != nil {
return 0, 0, 0, err
return err

Check warning on line 203 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L203

Added line #L203 was not covered by tests
}

batchFetched++
Expand All @@ -226,14 +223,14 @@
// write to elastic search in batches
if batchFetched%i.batchSize == 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, 0, err
return err

Check warning on line 226 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L226

Added line #L226 was not covered by tests
}
}
}

if subBatch.Len() > 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, 0, err
return err

Check warning on line 233 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L233

Added line #L233 was not covered by tests
}
}

Expand Down Expand Up @@ -268,13 +265,15 @@
log.Debug("indexed contact batch")
}

i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart))

// last modified stayed the same and we didn't add anything, seen it all, break out
if lastModified.Equal(queryModified) && batchCreated == 0 {
break
}
}

return totalCreated, totalUpdated, totalDeleted, nil
return nil
}

func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {
Expand Down
Loading