From 2fe55958e86d6b483d04742d186b8401d692090e Mon Sep 17 00:00:00 2001 From: FluxCapacitor2 <31071265+FluxCapacitor2@users.noreply.github.com> Date: Wed, 8 Jan 2025 17:59:43 -0500 Subject: [PATCH] Fix rare unique constraint violation when inserting vec chunks to override ones that haven't yet been deleted --- app/database/db_sqlite.go | 10 ++++++++-- app/processqueue.go | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/app/database/db_sqlite.go b/app/database/db_sqlite.go index 33667d4..474c2db 100644 --- a/app/database/db_sqlite.go +++ b/app/database/db_sqlite.go @@ -682,7 +682,7 @@ func (db *SQLiteDatabase) PopEmbedQueue(ctx context.Context, limit int, source s // Find the first item in the queue and update it in one step. If the row isn't returned, another process must have updated it at the same time. rows, err := db.conn.QueryContext(ctx, ` UPDATE embed_queue SET status = ?, updatedAt = CURRENT_TIMESTAMP WHERE id IN ( - SELECT embed_queue.id FROM embed_queue JOIN pages ON embed_queue.page = pages.id WHERE embed_queue.status = ? AND pages.source = ? ORDER BY embed_queue.addedAt LIMIT ? + SELECT embed_queue.id FROM embed_queue JOIN pages ON embed_queue.page = pages.id WHERE embed_queue.status = ? AND pages.source = ? ORDER BY embed_queue.page, embed_queue.chunkIndex LIMIT ? ) RETURNING id, status, page, chunkIndex, chunk; `, Processing, Pending, source, limit) @@ -728,7 +728,13 @@ func (db *SQLiteDatabase) AddEmbedding(ctx context.Context, pageID int64, source } id := int64(-1) - err = tx.QueryRowContext(ctx, "INSERT INTO vec_chunks (page, chunkIndex, chunk) VALUES (?, ?, ?) RETURNING id;", pageID, chunkIndex, chunk).Scan(&id) + err = tx.QueryRowContext(ctx, ` + -- Delete leftover chunks that may exist from previous embedding jobs. + -- This statement only works because we pop elements from the embed queue in ascending order by chunkIndex. + DELETE FROM vec_chunks WHERE page = ? AND chunkIndex >= ?; + REPLACE INTO vec_chunks (page, chunkIndex, chunk) VALUES (?, ?, ?) RETURNING id; + `, pageID, chunkIndex, pageID, chunkIndex, chunk).Scan(&id) + if err != nil { if err := tx.Rollback(); err != nil { return err diff --git a/app/processqueue.go b/app/processqueue.go index c615be8..e9d5933 100644 --- a/app/processqueue.go +++ b/app/processqueue.go @@ -125,7 +125,7 @@ func processEmbedQueue(ctx context.Context, db database.Database, src config.Sou for i, item := range items { err = db.AddEmbedding(ctx, item.PageID, src.ID, item.ChunkIndex, item.Content, vectors[i]) if err != nil { - slogctx.Error(ctx, "Failed to save embedding", "error", err) + slogctx.Error(ctx, "Failed to save embedding", "error", err, "page", item.PageID, "chunkIndex", item.ChunkIndex) markFailure(item.ID) return }