Skip to content

Commit

Permalink
Fix rare unique constraint violation when inserting vec chunks to ove…
Browse files Browse the repository at this point in the history
…rride ones that haven't yet been deleted
  • Loading branch information
FluxCapacitor2 committed Jan 8, 2025
1 parent adac523 commit 2fe5595
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
10 changes: 8 additions & 2 deletions app/database/db_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ func (db *SQLiteDatabase) PopEmbedQueue(ctx context.Context, limit int, source s
// Find the first item in the queue and update it in one step. If the row isn't returned, another process must have updated it at the same time.
rows, err := db.conn.QueryContext(ctx, `
UPDATE embed_queue SET status = ?, updatedAt = CURRENT_TIMESTAMP WHERE id IN (
SELECT embed_queue.id FROM embed_queue JOIN pages ON embed_queue.page = pages.id WHERE embed_queue.status = ? AND pages.source = ? ORDER BY embed_queue.addedAt LIMIT ?
SELECT embed_queue.id FROM embed_queue JOIN pages ON embed_queue.page = pages.id WHERE embed_queue.status = ? AND pages.source = ? ORDER BY embed_queue.page, embed_queue.chunkIndex LIMIT ?
) RETURNING id, status, page, chunkIndex, chunk;
`, Processing, Pending, source, limit)

Expand Down Expand Up @@ -728,7 +728,13 @@ func (db *SQLiteDatabase) AddEmbedding(ctx context.Context, pageID int64, source
}

id := int64(-1)
err = tx.QueryRowContext(ctx, "INSERT INTO vec_chunks (page, chunkIndex, chunk) VALUES (?, ?, ?) RETURNING id;", pageID, chunkIndex, chunk).Scan(&id)
err = tx.QueryRowContext(ctx, `
-- Delete leftover chunks that may exist from previous embedding jobs.
-- This statement only works because we pop elements from the embed queue in ascending order by chunkIndex.
DELETE FROM vec_chunks WHERE page = ? AND chunkIndex >= ?;
REPLACE INTO vec_chunks (page, chunkIndex, chunk) VALUES (?, ?, ?) RETURNING id;
`, pageID, chunkIndex, pageID, chunkIndex, chunk).Scan(&id)

if err != nil {
if err := tx.Rollback(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion app/processqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func processEmbedQueue(ctx context.Context, db database.Database, src config.Sou
for i, item := range items {
err = db.AddEmbedding(ctx, item.PageID, src.ID, item.ChunkIndex, item.Content, vectors[i])
if err != nil {
slogctx.Error(ctx, "Failed to save embedding", "error", err)
slogctx.Error(ctx, "Failed to save embedding", "error", err, "page", item.PageID, "chunkIndex", item.ChunkIndex)
markFailure(item.ID)
return
}
Expand Down

0 comments on commit 2fe5595

Please sign in to comment.