diff --git a/bun.lockb b/bun.lockb index 12fd9fc3..768690a2 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/packages/ingester/package.json b/packages/ingester/package.json index 5ecda15d..c07b3cf4 100644 --- a/packages/ingester/package.json +++ b/packages/ingester/package.json @@ -16,7 +16,8 @@ "@withorbit/core": "0.0.1", "@withorbit/store-fs": "0.0.1", "@withorbit/store-shared": "0.0.1", - "ajv": "^8.6.2" + "ajv": "^8.6.2", + "lodash.isequal": "^4.5.0" }, "devDependencies": { "@babel/plugin-syntax-import-attributes": "^7.23.3", diff --git a/packages/ingester/src/ingest.test.ts b/packages/ingester/src/ingest.test.ts index c7d32bfc..376722e0 100644 --- a/packages/ingester/src/ingest.test.ts +++ b/packages/ingester/src/ingest.test.ts @@ -215,6 +215,38 @@ it("only ingests specified sources", async () => { expect(events).toHaveLength(0); }); +it("updates provenance when source metadata changes", async () => { + const mockIngestEvent = mockQATask({ + body: "Question", + answer: "Answer", + provenance: { identifier: "source_identifier", title: "Existing Source" }, + }); + await store.database.putEvents([mockIngestEvent]); + const sources: IngestibleSource[] = [ + { + identifier: "source_identifier" as IngestibleSourceIdentifier, + title: "Renamed Source", + items: [ + { + identifier: "Question+Answer" as IngestibleItemIdentifier, + spec: mockIngestEvent.spec, + }, + ], + }, + ]; + + const events = await ingestSources(sources, store); + expect(events).toHaveLength(1); + expect(events[0].type).toBe(EventType.TaskUpdateProvenanceEvent); + const event = events[0] as TaskUpdateProvenanceEvent; + + expect(event.entityID).toEqual(mockIngestEvent.entityID); + expect(event.provenance).toEqual({ + identifier: "source_identifier", + title: "Renamed Source", + }); +}); + it("moves entities across sources", async () => { const mockIngestEvent = mockQATask({ body: "Question", diff --git a/packages/ingester/src/ingest.ts b/packages/ingester/src/ingest.ts index 349a1ea6..efa53365 100644 --- a/packages/ingester/src/ingest.ts +++ b/packages/ingester/src/ingest.ts @@ -17,6 +17,7 @@ import { IngestibleItemIdentifier, IngestibleSource, } from "./ingestible.js"; +import isEqual from "lodash.isequal"; type IngestOptions = { ingestDateMillis: number; @@ -43,6 +44,7 @@ export async function ingestSources( IngestibleItemIdentifier, TaskUpdateDeletedEvent >(); + const updateProvenanceEvents: TaskUpdateProvenanceEvent[] = []; // TODO: create a new query or extend the entity query such that // we can filter on only sources that would have used this ingester @@ -58,25 +60,36 @@ export async function ingestSources( const timeMillis = opts.ingestDateMillis; for (const source of sources) { - const existingEntity = existingGroupedEntities.get(source.identifier); - if (existingEntity) { + const existingEntities = existingGroupedEntities.get(source.identifier); + const provenance = createProvenanceForSource(source); + if (existingEntities) { // determine which tasks are new and which have been deleted const existingEntitiesByItemIdentifiers = - mapEntitiesByItemIdentifier(existingEntity); + mapEntitiesByItemIdentifier(existingEntities); const ingestibleItemsByItemIdentifiers = mapIngestibleItemsByItemIdentifier(source.items); // determine which tasks are newly added for (const [key, potentialNewItem] of ingestibleItemsByItemIdentifiers) { - if (!existingEntitiesByItemIdentifiers.get(key)) { + const existingEntity = existingEntitiesByItemIdentifiers.get(key); + if (existingEntity) { + // task exists + if (!isEqual(provenance, existingEntity.provenance)) { + // task has been updated + updateProvenanceEvents.push( + createUpdateProvenanceEvent( + existingEntity.id, + timeMillis, + provenance, + ), + ); + } + } else { // task is new ingestEvents.set( key, createIngestTaskForSource(source, timeMillis)(potentialNewItem), ); - } else { - // task exists - // TODO: we should make sure the content is up to date } } @@ -100,7 +113,6 @@ export async function ingestSources( } // check if the inserts / deletes are really just moves - const updateProvenanceEvents: TaskUpdateProvenanceEvent[] = []; for (const [ingestedEventItemIdentifier, ingestEvent] of ingestEvents) { const deleteEvent = deleteEvents.get(ingestedEventItemIdentifier); if (deleteEvent) { @@ -108,8 +120,15 @@ export async function ingestSources( // delete the old events and generate an update provenance event instead deleteEvents.delete(ingestedEventItemIdentifier); ingestEvents.delete(ingestedEventItemIdentifier); + if (!ingestEvent.provenance) { + throw new Error("all ingest tasks should have provenance"); + } updateProvenanceEvents.push( - createUpdateProvenanceEvent(deleteEvent.entityID, ingestEvent), + createUpdateProvenanceEvent( + deleteEvent.entityID, + ingestEvent.timestampMillis, + ingestEvent.provenance, + ), ); } } @@ -165,10 +184,7 @@ function mapIngestibleItemsByItemIdentifier(items: IngestibleItem[]) { return mapping; } -function createIngestTaskForSource( - source: IngestibleSource, - insertTimestampMilis: number, -): (item: IngestibleItem) => TaskIngestEvent { +function createProvenanceForSource(source: IngestibleSource) { const provenance: TaskProvenance = { identifier: source.identifier, title: source.title, @@ -177,6 +193,14 @@ function createIngestTaskForSource( ? { colorPaletteName: source.colorPaletteName } : {}), }; + return provenance; +} + +function createIngestTaskForSource( + source: IngestibleSource, + insertTimestampMilis: number, +): (item: IngestibleItem) => TaskIngestEvent { + const provenance = createProvenanceForSource(source); return (item) => { return { id: generateUniqueID(), @@ -209,13 +233,14 @@ function createDeleteTaskEvent( function createUpdateProvenanceEvent( originalEntityId: TaskID, - ingestEvent: TaskIngestEvent, + timestampMillis: number, + provenance: TaskProvenance, ): TaskUpdateProvenanceEvent { return { type: EventType.TaskUpdateProvenanceEvent, id: generateUniqueID(), entityID: originalEntityId, - timestampMillis: ingestEvent.timestampMillis, - provenance: ingestEvent.provenance, + timestampMillis, + provenance, }; }