diff --git a/indexers/base.go b/indexers/base.go index 244b9bb..6737329 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -24,7 +24,7 @@ const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type": type Stats struct { Indexed int64 // total number of documents indexed Deleted int64 // total number of documents deleted - Elapsed time.Duration // total time spent actually indexing + Elapsed time.Duration // total time spent actually indexing (excludes poll delay) } // Indexer is base interface for indexers @@ -84,8 +84,8 @@ func (i *baseIndexer) log() *slog.Logger { return slog.With("indexer", i.name) } -// records a complete index and updates statistics -func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) { +// records indexing activity and updates statistics +func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) { i.stats.Indexed += int64(indexed) i.stats.Deleted += int64(deleted) i.stats.Elapsed += elapsed diff --git a/indexers/contacts.go b/indexers/contacts.go index 32486e5..e15520d 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -63,14 +63,11 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified) // now index our docs - start := time.Now() - created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild) + err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild) if err != nil { return "", fmt.Errorf("error indexing documents: %w", err) } - i.recordComplete(created+updated, deleted, time.Since(start)) - // if the index didn't previously exist or we are rebuilding, remap to our alias if remapAlias { err := i.updateAlias(physicalIndex) @@ -153,7 +150,7 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( ` // IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time -func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) { +func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error { totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0 var modifiedOn time.Time @@ -193,17 +190,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st // no more rows? return if err == sql.ErrNoRows { - return 0, 0, 0, nil + return nil } if err != nil { - return 0, 0, 0, err + return err } defer rows.Close() for rows.Next() { err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON) if err != nil { - return 0, 0, 0, err + return err } batchFetched++ @@ -226,14 +223,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st // write to elastic search in batches if batchFetched%i.batchSize == 0 { if err := indexSubBatch(subBatch); err != nil { - return 0, 0, 0, err + return err } } } if subBatch.Len() > 0 { if err := indexSubBatch(subBatch); err != nil { - return 0, 0, 0, err + return err } } @@ -268,13 +265,15 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st log.Debug("indexed contact batch") } + i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart)) + // last modified stayed the same and we didn't add anything, seen it all, break out if lastModified.Equal(queryModified) && batchCreated == 0 { break } } - return totalCreated, totalUpdated, totalDeleted, nil + return nil } func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {