Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist limit buffers #14

Merged
merged 11 commits into from
Dec 10, 2023
199 changes: 94 additions & 105 deletions src/rx-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,6 @@ export class RxQueryBase<


if (this.op === 'count') {
// if we have a persisted query cache result, use the result
if (this._persistentQueryCacheResult) {
// TODO: correct this number, but how?
return Number(this._persistentQueryCacheResult);
}

const preparedQuery = this.getPreparedQuery();
const result = await this.collection.storageInstance.count(preparedQuery);
if (result.mode === 'slow' && !this.collection.database.allowSlowCount) {
Expand Down Expand Up @@ -535,122 +529,119 @@ export class RxQueryBase<
// no cache backend provided, do nothing
return;
}

if (this._persistentQueryCacheResult) {
// we already restored the cache once, no need to run twice
return;
}

if (this.mangoQuery.skip || this.op === 'count') {
console.error('The persistent query cache only works on non-skip, non-count queries.');
return;
}

// First, check if there are any query results persisted:
const persistentQueryId = this.persistentQueryId();
const value = await this._persistentQueryCacheBackend.getItem<string[] | string>(`qc:${persistentQueryId}`);
if (!value) {
if (!value || !Array.isArray(value) || value.length === 0) {
// eslint-disable-next-line no-console
console.log(`no persistent query cache found in the backend, returning early ${this.toString()}`);
return;
}

// If there are persisted ids, create our two Sets of ids from the cache:
const persistedQueryCacheIds = new Set<string>();
const limitBufferIds = new Set<string>();

for (const id of value) {
if (id.startsWith('lb-')) {
limitBufferIds.add(id.replace('lb-', ''));
} else {
persistedQueryCacheIds.add(id);
}
}

// eslint-disable-next-line no-console
console.time(`Restoring persistent querycache ${this.toString()}`);

// Next, pull the lwt from the cache:
// TODO: if lwt is too old, should we just give up here? What if there are too many changedDocs?
const lwt = (await this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`)) as string | null;
if (!lwt) {
return;
}

const primaryPath = this.collection.schema.primaryPath;

this._persistentQueryCacheResult = value ?? undefined;
this._persistentQueryCacheResultLwt = lwt ?? undefined;
// query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library)
const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince(
1_000_000,
// make sure we remove the monotonic clock (xxx.01, xxx.02) from the lwt timestamp to avoid issues with
// lookups in indices (dexie)
{id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT}
);

// if this is a regular query, also load documents into cache
if (Array.isArray(value) && value.length > 0) {
const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult);
const changedDocIds = new Set<string>(changedDocs.map((d) => d[primaryPath] as string));

let docsData: RxDocumentData<RxDocType>[] = [];
const docIdsWeNeedToFetch = [...persistedQueryCacheIds, ...limitBufferIds].filter((id) => !changedDocIds.has(id));

// query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library)
const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince(
1_000_000,
// make sure we remove the monotonic clock (xxx.01, xxx.02) from the lwt timestamp to avoid issues with
// lookups in indices (dexie)
{id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT}
);
// We use _queryCollectionByIds to fetch the remaining docs we need efficiently, pulling
// from query cache if we can (and the storageInstance by ids if we can't):
const otherPotentialMatchingDocs: RxDocumentData<RxDocType>[] = [];
await _queryCollectionByIds(this as any, otherPotentialMatchingDocs, docIdsWeNeedToFetch);

for (const changedDoc of changedDocs) {
const docWasInOldPersistedResults = persistedQueryCacheIds.has(changedDoc[primaryPath] as string);
const docMatchesNow = this.doesDocumentDataMatch(changedDoc);

if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit) {
// Unfortunately if any doc was removed from the results since the last result,
// there is no way for us to be sure our calculated results are correct.
// So we should simply give up and re-exec the query.
this._persistentQueryCacheResult = value ?? undefined;
this._persistentQueryCacheResultLwt = lwt ?? undefined;
return;
}

if (docWasInOldPersistedResults) {
/*
* no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter
* deleted docs as well
*/
persistedQueryCacheIds.delete(changedDoc[primaryPath] as string);
}

// ignore deleted docs or docs that do not match the query
if (!docMatchesNow) {
continue;
}

// add to document cache
this.collection._docCache.getCachedRxDocument(changedDoc);

// add to docs
docsData.push(changedDoc);
// Now that we have all potential documents, we just filter (in-memory) the ones that still match our query:
let docsData: RxDocumentData<RxDocType>[] = [];
for (const doc of changedDocs.concat(otherPotentialMatchingDocs)) {
if (this.doesDocumentDataMatch(doc)) {
docsData.push(doc);
}
}

// fetch remaining persisted doc ids
const nonRestoredDocIds: string[] = [];
for (const docId of persistedQueryCacheIds) {
// first try to fill from docCache
const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId);
if (docData && this.doesDocumentDataMatch(docData)) {
docsData.push(docData);
}

if (!docData) {
nonRestoredDocIds.push(docId);
}
}
// Sort the documents by the query's sort field:
const normalizedMangoQuery = normalizeMangoQuery<RxDocType>(
this.collection.schema.jsonSchema,
this.mangoQuery
);
const sortComparator = getSortComparator(this.collection.schema.jsonSchema, normalizedMangoQuery);
const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity;
docsData = docsData.sort(sortComparator);

// We know for sure that all persisted and limit buffer ids (and changed docs before them) are in the correct
// result set. And we can't be sure about any past that point. So cut it off there:
const lastValidIndex = docsData.findLastIndex((d) => limitBufferIds.has(d[primaryPath] as string) || persistedQueryCacheIds.has(d[primaryPath] as string));
docsData = docsData.slice(0, lastValidIndex + 1);

// Now this is the trickiest part.
// If we somehow have fewer docs than the limit of our query
// (and this wasn't the case because before persistence)
// then there is no way for us to know the correct results, and we re-exec:
const unchangedItemsMayNowBeInResults = (
this.mangoQuery.limit &&
docsData.length < this.mangoQuery.limit &&
persistedQueryCacheIds.size >= this.mangoQuery.limit
);
if (unchangedItemsMayNowBeInResults) {
return;
}

// otherwise get from storage
if (nonRestoredDocIds.length > 0) {
const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false);
Object.values(docsMap).forEach(docData => {
this.collection._docCache.getCachedRxDocument(docData);
docsData.push(docData);
});
}
// Our finalResults are the actual results of this query, and pastLimitItems are any remaining matching
// documents we have left over (past the limit).
const pastLimitItems = docsData.slice(limit);
const finalResults = docsData.slice(0, limit);

const normalizedMangoQuery = normalizeMangoQuery<RxDocType>(
this.collection.schema.jsonSchema,
this.mangoQuery
);
const sortComparator = getSortComparator(this.collection.schema.jsonSchema, normalizedMangoQuery);
const skip = normalizedMangoQuery.skip ? normalizedMangoQuery.skip : 0;
const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity;
const skipPlusLimit = skip + limit;
docsData = docsData.sort(sortComparator);
docsData = docsData.slice(skip, skipPlusLimit);


// get query into the correct state
this._lastEnsureEqual = now();
this._latestChangeEvent = this.collection._changeEventBuffer.counter;
this._setResultData(docsData);
// If there are still items past the first LIMIT items, try to restore the limit buffer with them:
if (limitBufferIds.size && pastLimitItems.length > 0) {
this._limitBufferResults = pastLimitItems;
} else {
this._limitBufferResults = [];
}

// Finally, set the query's results to what we've pulled from disk:
this._lastEnsureEqual = now();
this._latestChangeEvent = this.collection._changeEventBuffer.counter;
this._setResultData(finalResults);

// eslint-disable-next-line no-console
console.timeEnd(`Restoring persistent querycache ${this.toString()}`);

}
}

Expand Down Expand Up @@ -814,16 +805,6 @@ async function __ensureEqual<RxDocType>(rxQuery: RxQueryBase<RxDocType>): Promis
// we got the new results, we do not have to re-execute, mustReExec stays false
ret = true; // true because results changed
rxQuery._setResultData(eventReduceResult.newResults as any);

/*
* We usually want to persist the cache every time there is an update to the query to guarantee
* correctness. Cache persistence has some "cost", and we therefore try to optimize the number of
* writes.
* So, if any item in the result set was removed, we re-persist the query.
*/
if (rxQuery.mangoQuery.limit && eventReduceResult.limitResultsRemoved) {
await updatePersistentQueryCache(rxQuery);
}
}
}
}
Expand Down Expand Up @@ -880,17 +861,25 @@ async function updatePersistentQueryCache<RxDocType>(rxQuery: RxQueryBase<RxDocT
const backend = rxQuery._persistentQueryCacheBackend;

const key = rxQuery.persistentQueryId();
const value = rxQuery._result?.docsKeys ?? [];

// update _persistedQueryCacheResult
rxQuery._persistentQueryCacheResult = value;
rxQuery._persistentQueryCacheResult = rxQuery._result?.docsKeys ?? [];

const idsToPersist = [...rxQuery._persistentQueryCacheResult];
if (rxQuery._limitBufferResults) {
rxQuery._limitBufferResults.forEach((d) => {
idsToPersist.push(`lb-${d[rxQuery.collection.schema.primaryPath]}`);
});
}
// eslint-disable-next-line no-console
console.time(`Query persistence: persisting results of ${JSON.stringify(rxQuery.mangoQuery)}`);
// persist query cache
const lwt = rxQuery._result?.time ?? RX_META_LWT_MINIMUM;
await backend.setItem(`qc:${String(key)}`, value);
await backend.setItem(`qc:${String(key)}:lwt`, lwt.toString());

await Promise.all([
backend.setItem(`qc:${String(key)}`, idsToPersist),
backend.setItem(`qc:${String(key)}:lwt`, lwt.toString()),
]);

// eslint-disable-next-line no-console
console.timeEnd(`Query persistence: persisting results of ${JSON.stringify(rxQuery.mangoQuery)}`);
Expand Down
Loading
Loading