Skip to content

Commit

Permalink
serious refactor to dramatically simplify (and make more efficient + …
Browse files Browse the repository at this point in the history
…correct) the restoration logic. and more tests for it.
  • Loading branch information
TristanH committed Dec 10, 2023
1 parent 4d0b936 commit e7f46f6
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 88 deletions.
131 changes: 47 additions & 84 deletions src/rx-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ export class RxQueryBase<
// no cache backend provided, do nothing
return;
}

if (this._persistentQueryCacheResult) {
// we already restored the cache once, no need to run twice
return;
Expand All @@ -553,25 +552,27 @@ export class RxQueryBase<
console.log(`no persistent query cache found in the backend, returning early ${this.toString()}`);
return;
}

// eslint-disable-next-line no-console
console.time(`Restoring persistent querycache ${this.toString()}`);

const primaryPath = this.collection.schema.primaryPath;
const results = await Promise.all([
this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`),
this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lb`),
]);
const [lwt, limitBufferIds] = [results[0] as string | null, results[1] as string[] | null];
const lwt = results[0] as string | null;
const limitBufferIds = new Set<string>(results[1] ?? []);

const primaryPath = this.collection.schema.primaryPath;
if (!lwt) {
return;
}

this._persistentQueryCacheResult = value ?? undefined;
this._persistentQueryCacheResultLwt = lwt ?? undefined;
this._persistentQueryCacheResult = value;
this._persistentQueryCacheResultLwt = lwt;

const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult);

let docsData: RxDocumentData<RxDocType>[] = [];
const changedDocIds: Set<string> = new Set();

// query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library)
const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince(
1_000_000,
Expand All @@ -580,72 +581,24 @@ export class RxQueryBase<
{id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT}
);

for (const changedDoc of changedDocs) {
const docWasInOldPersistedResults = persistedQueryCacheIds.has(changedDoc[primaryPath] as string);
const docMatchesNow = this.doesDocumentDataMatch(changedDoc);
const changedDocIds = new Set<string>(changedDocs.map((d) => d[primaryPath] as string));

if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit && !limitBufferIds?.length) {
// Unfortunately if any doc was removed from the results since the last result,
// there is no way for us to be sure our calculated results are correct.
// So we should simply give up and re-exec the query.
this._persistentQueryCacheResult = value ?? undefined;
this._persistentQueryCacheResultLwt = lwt ?? undefined;
return;
}

if (docWasInOldPersistedResults) {
/*
* no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter
* out docs that are no longer matching the query.
*/
persistedQueryCacheIds.delete(changedDoc[primaryPath] as string);
}

// ignore deleted docs or docs that do not match the query
if (!docMatchesNow) {
continue;
}

// add to document cache
this.collection._docCache.getCachedRxDocument(changedDoc);

// add to docs
docsData.push(changedDoc);
changedDocIds.add(changedDoc[primaryPath] as string);
}

// Get the rest of the doc ids we need to consider:
const moreDocIdsToConsider = new Set(Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? []));

const nonRestoredDocIds: string[] = [];
for (const docId of moreDocIdsToConsider) {
if (changedDocIds.has(docId)) {
// we already fetched this doc because it changed, don't get it again
continue;
}
const docIdsWeNeedToFetch = [...persistedQueryCacheIds, ...limitBufferIds].filter((id) => !changedDocIds.has(id));

// first try to fill from docCache
const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId);
if (docData && this.doesDocumentDataMatch(docData)) {
docsData.push(docData);
}
// We use _queryCollectionByIds to fetch the remaining docs we need efficiently, pulling
// from query cache if we can (and the storageInstance by ids if we can't):
const otherPotentialMatchingDocs: RxDocumentData<RxDocType>[] = [];
await _queryCollectionByIds(this as any, otherPotentialMatchingDocs, docIdsWeNeedToFetch);

if (!docData) {
nonRestoredDocIds.push(docId);
// Now that we have all potential documents, we just filter (in-memory) the ones that still match our query:
let docsData: RxDocumentData<RxDocType>[] = [];
for (const doc of changedDocs.concat(otherPotentialMatchingDocs)) {
if (this.doesDocumentDataMatch(doc)) {
docsData.push(doc);
}
}

// otherwise get from storage
if (nonRestoredDocIds.length > 0) {
const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false);
Object.values(docsMap).forEach(docData => {
if (this.doesDocumentDataMatch(docData)) {
this.collection._docCache.getCachedRxDocument(docData);
docsData.push(docData);
}
});
}

// Sort the documents by the query's sort field:
const normalizedMangoQuery = normalizeMangoQuery<RxDocType>(
this.collection.schema.jsonSchema,
this.mangoQuery
Expand All @@ -654,25 +607,36 @@ export class RxQueryBase<
const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity;
docsData = docsData.sort(sortComparator);

// We know for sure that all persisted and limit buffer ids (and changed docs before them) are in the correct
// result set. And we can't be sure about any past that point. So cut it off there:
const lastValidIndex = docsData.findLastIndex((d) => limitBufferIds.has(d[primaryPath] as string) || persistedQueryCacheIds.has(d[primaryPath] as string));
docsData = docsData.slice(0, lastValidIndex + 1);

// Now this is the trickiest part.
// If we somehow have fewer docs than the limit of our query
// (and this wasn't the case because before persistence)
// then there is no way for us to know the correct results, and we re-exec:
const unchangedItemsMayNowBeInResults = (
this.mangoQuery.limit &&
docsData.length < this.mangoQuery.limit &&
persistedQueryCacheIds.size >= this.mangoQuery.limit
);
if (unchangedItemsMayNowBeInResults) {
this._persistentQueryCacheResult = undefined;
this._persistentQueryCacheResultLwt = undefined;
return;
}

// Our finalResults are the actual results of this query, and pastLimitItems are any remaining matching
// documents we have left over (past the limit).
const pastLimitItems = docsData.slice(limit);
const finalResults = docsData.slice(0, limit);

// If we had a limit buffer before, and now we don't... it means the limit buffer was exhausted.
// To be confident we're not missing anything, we need to re-exec the query:
if (limitBufferIds?.length && pastLimitItems.length === 0) {
this._persistentQueryCacheResult = value ?? undefined;
this._persistentQueryCacheResultLwt = lwt ?? undefined;
return;
}
// If there are still items past the first LIMIT items, try to restore the limit buffer with them:
if (limitBufferIds?.length && pastLimitItems.length > 0) {
const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string));
if (lastLimitBufferIndex !== -1){
// If the limit buffer still has room, simply restore it:
this._limitBufferResults = pastLimitItems.slice(0, Math.max(lastLimitBufferIndex + 1, this._limitBufferSize ?? 0));
} else {
this._limitBufferResults = [];
}
if (limitBufferIds.size && pastLimitItems.length > 0) {
this._limitBufferResults = pastLimitItems;
} else {
this._limitBufferResults = [];
}

// get query into the correct state
Expand All @@ -682,7 +646,6 @@ export class RxQueryBase<

// eslint-disable-next-line no-console
console.timeEnd(`Restoring persistent querycache ${this.toString()}`);

}
}

Expand Down
65 changes: 61 additions & 4 deletions test/unit/rx-query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1769,7 +1769,7 @@ describe('rx-query.test.ts', () => {
const result2 = await query2.exec();

assert.strictEqual(query1._execOverDatabaseCount, 0);
assert.strictEqual(query2._execOverDatabaseCount, 1);
assert.strictEqual(query2._execOverDatabaseCount, 0);
assert.deepStrictEqual(result2.map(item => item.passportId), ['1', '3']);

collection.database.destroy();
Expand Down Expand Up @@ -1844,7 +1844,7 @@ describe('rx-query.test.ts', () => {

// wait 1 second so that not all docs are included in lwt
await new Promise((resolve) => {
setTimeout(resolve, 1000);
setTimeout(resolve, 500);
});

// Cache a limited query:
Expand Down Expand Up @@ -2016,16 +2016,17 @@ describe('rx-query.test.ts', () => {
assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '4']);
assert.strictEqual(queryAgain._limitBufferResults?.length, 1);

// But if we go further, and use the last item from the limit buffer, we'll have to re-exec:
// But if we go further, and use the last items from the limit buffer, we'll have to re-exec:
uncacheRxQuery(collection._queryCache, queryAgain);
await collection.find({ selector: { passportId: '4' } }).remove();
await collection.find({ selector: { passportId: '5' } }).remove();

const queryFinal = collection.find(query.mangoQuery);
queryFinal.enableLimitBuffer(3).enablePersistentQueryCache(cache);

const finalResults = await queryFinal.exec();
assert.strictEqual(queryFinal._execOverDatabaseCount, 1);
assert.deepStrictEqual(finalResults.map(h => h.passportId), ['2', '5']);
assert.deepStrictEqual(finalResults.map(h => h.passportId), ['2']);

collection.database.destroy();
});
Expand Down Expand Up @@ -2107,6 +2108,62 @@ describe('rx-query.test.ts', () => {
// how they would be sorted relative to other documents
assert.strictEqual(queryAgain._limitBufferResults?.length, 0);

simulateNewSession(collection);

// If one more doc is removed from our results, we will HAVE to re-exec to ensure
// correct results, test that:
await collection.find({ selector: { passportId: '3' } }).update({
$set: { age: 1 }
});

const queryFinal = collection.find(query.mangoQuery);
queryFinal.enableLimitBuffer(2).enablePersistentQueryCache(cache);

const finalResults = await queryFinal.exec();

// Query re-execs, and gives correct results:
assert.strictEqual(queryFinal._execOverDatabaseCount, 1);
assert.deepStrictEqual(finalResults.map(h => h.passportId), ['4', '5']);

// When we re-exec, the limit buffer will also get filled:
assert.deepStrictEqual(queryFinal._limitBufferResults?.map(h => h.passportId), ['6', '7']);

collection.database.destroy();
});

it('Handles case where we have fewer than LIMIT matches', async () => {
const { collection, cache } = await setUpLimitBufferSituation();

const query = collection.find({ limit: 3, sort: [{age: 'asc'}], selector: { age: { $lt: 45 } } });
query.enableLimitBuffer(2).enablePersistentQueryCache(cache);
await query.exec();
simulateNewSession(collection);

// Remove something, still correct and no-re-exec
await collection.find({ selector: { passportId: '1' } }).remove();

const queryRemoved = collection.find(query.mangoQuery);
queryRemoved.enableLimitBuffer(2).enablePersistentQueryCache(cache);
const removedResults = await queryRemoved.exec();
assert.strictEqual(queryRemoved._execOverDatabaseCount, 0);
assert.deepStrictEqual(removedResults.map(h => h.passportId), ['2']);

simulateNewSession(collection);

// Now add some matching docs. Since they change, they should now be in results with no re-exec.
await collection.find({ selector: { passportId: '5' } }).update({
$set: { age: 1 }
});
await collection.bulkUpsert([
schemaObjects.human('6', 2),
schemaObjects.human('7', 3),
]);
const queryAdded = collection.find(query.mangoQuery);
queryAdded.enableLimitBuffer(2).enablePersistentQueryCache(cache);
const addedResults = await queryRemoved.exec();
assert.strictEqual(queryAdded._execOverDatabaseCount, 0);
assert.deepStrictEqual(addedResults.map(h => h.passportId), ['5', '6', '7']);

collection.database.destroy();
});
});
Expand Down

0 comments on commit e7f46f6

Please sign in to comment.