Skip to content

Commit

Permalink
ingester now updates task provenance when source metadata changes
Browse files Browse the repository at this point in the history
  • Loading branch information
andymatuschak committed Apr 5, 2024
1 parent 5e26242 commit c5f9d9c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 17 deletions.
Binary file modified bun.lockb
Binary file not shown.
3 changes: 2 additions & 1 deletion packages/ingester/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"@withorbit/core": "0.0.1",
"@withorbit/store-fs": "0.0.1",
"@withorbit/store-shared": "0.0.1",
"ajv": "^8.6.2"
"ajv": "^8.6.2",
"lodash.isequal": "^4.5.0"
},
"devDependencies": {
"@babel/plugin-syntax-import-attributes": "^7.23.3",
Expand Down
32 changes: 32 additions & 0 deletions packages/ingester/src/ingest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,38 @@ it("only ingests specified sources", async () => {
expect(events).toHaveLength(0);
});

it("updates provenance when source metadata changes", async () => {
const mockIngestEvent = mockQATask({
body: "Question",
answer: "Answer",
provenance: { identifier: "source_identifier", title: "Existing Source" },
});
await store.database.putEvents([mockIngestEvent]);
const sources: IngestibleSource[] = [
{
identifier: "source_identifier" as IngestibleSourceIdentifier,
title: "Renamed Source",
items: [
{
identifier: "Question+Answer" as IngestibleItemIdentifier,
spec: mockIngestEvent.spec,
},
],
},
];

const events = await ingestSources(sources, store);
expect(events).toHaveLength(1);
expect(events[0].type).toBe(EventType.TaskUpdateProvenanceEvent);
const event = events[0] as TaskUpdateProvenanceEvent;

expect(event.entityID).toEqual(mockIngestEvent.entityID);
expect(event.provenance).toEqual({
identifier: "source_identifier",
title: "Renamed Source",
});
});

it("moves entities across sources", async () => {
const mockIngestEvent = mockQATask({
body: "Question",
Expand Down
57 changes: 41 additions & 16 deletions packages/ingester/src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
IngestibleItemIdentifier,
IngestibleSource,
} from "./ingestible.js";
import isEqual from "lodash.isequal";

type IngestOptions = {
ingestDateMillis: number;
Expand All @@ -43,6 +44,7 @@ export async function ingestSources(
IngestibleItemIdentifier,
TaskUpdateDeletedEvent
>();
const updateProvenanceEvents: TaskUpdateProvenanceEvent[] = [];

// TODO: create a new query or extend the entity query such that
// we can filter on only sources that would have used this ingester
Expand All @@ -58,25 +60,36 @@ export async function ingestSources(
const timeMillis = opts.ingestDateMillis;

for (const source of sources) {
const existingEntity = existingGroupedEntities.get(source.identifier);
if (existingEntity) {
const existingEntities = existingGroupedEntities.get(source.identifier);
const provenance = createProvenanceForSource(source);
if (existingEntities) {
// determine which tasks are new and which have been deleted
const existingEntitiesByItemIdentifiers =
mapEntitiesByItemIdentifier(existingEntity);
mapEntitiesByItemIdentifier(existingEntities);
const ingestibleItemsByItemIdentifiers =
mapIngestibleItemsByItemIdentifier(source.items);

// determine which tasks are newly added
for (const [key, potentialNewItem] of ingestibleItemsByItemIdentifiers) {
if (!existingEntitiesByItemIdentifiers.get(key)) {
const existingEntity = existingEntitiesByItemIdentifiers.get(key);
if (existingEntity) {
// task exists
if (!isEqual(provenance, existingEntity.provenance)) {
// task has been updated
updateProvenanceEvents.push(
createUpdateProvenanceEvent(
existingEntity.id,
timeMillis,
provenance,
),
);
}
} else {
// task is new
ingestEvents.set(
key,
createIngestTaskForSource(source, timeMillis)(potentialNewItem),
);
} else {
// task exists
// TODO: we should make sure the content is up to date
}
}

Expand All @@ -100,16 +113,22 @@ export async function ingestSources(
}

// check if the inserts / deletes are really just moves
const updateProvenanceEvents: TaskUpdateProvenanceEvent[] = [];
for (const [ingestedEventItemIdentifier, ingestEvent] of ingestEvents) {
const deleteEvent = deleteEvents.get(ingestedEventItemIdentifier);
if (deleteEvent) {
// item identifier is in both the ingest and delete map, must be a move:
// delete the old events and generate an update provenance event instead
deleteEvents.delete(ingestedEventItemIdentifier);
ingestEvents.delete(ingestedEventItemIdentifier);
if (!ingestEvent.provenance) {
throw new Error("all ingest tasks should have provenance");
}
updateProvenanceEvents.push(
createUpdateProvenanceEvent(deleteEvent.entityID, ingestEvent),
createUpdateProvenanceEvent(
deleteEvent.entityID,
ingestEvent.timestampMillis,
ingestEvent.provenance,
),
);
}
}
Expand Down Expand Up @@ -165,10 +184,7 @@ function mapIngestibleItemsByItemIdentifier(items: IngestibleItem[]) {
return mapping;
}

function createIngestTaskForSource(
source: IngestibleSource,
insertTimestampMilis: number,
): (item: IngestibleItem) => TaskIngestEvent {
function createProvenanceForSource(source: IngestibleSource) {
const provenance: TaskProvenance = {
identifier: source.identifier,
title: source.title,
Expand All @@ -177,6 +193,14 @@ function createIngestTaskForSource(
? { colorPaletteName: source.colorPaletteName }
: {}),
};
return provenance;
}

function createIngestTaskForSource(
source: IngestibleSource,
insertTimestampMilis: number,
): (item: IngestibleItem) => TaskIngestEvent {
const provenance = createProvenanceForSource(source);
return (item) => {
return {
id: generateUniqueID(),
Expand Down Expand Up @@ -209,13 +233,14 @@ function createDeleteTaskEvent(

function createUpdateProvenanceEvent(
originalEntityId: TaskID,
ingestEvent: TaskIngestEvent,
timestampMillis: number,
provenance: TaskProvenance,
): TaskUpdateProvenanceEvent {
return {
type: EventType.TaskUpdateProvenanceEvent,
id: generateUniqueID(),
entityID: originalEntityId,
timestampMillis: ingestEvent.timestampMillis,
provenance: ingestEvent.provenance,
timestampMillis,
provenance,
};
}

0 comments on commit c5f9d9c

Please sign in to comment.