From e7f46f6760f0ea707d32eac72b323d92feaff71f Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Sun, 10 Dec 2023 15:04:50 -0500 Subject: [PATCH] serious refactor to dramatically simplify (and make more efficient + correct) the restoration logic. and more tests for it. --- src/rx-query.ts | 131 +++++++++++++------------------------ test/unit/rx-query.test.ts | 65 ++++++++++++++++-- 2 files changed, 108 insertions(+), 88 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 59a8588aa1e..698bf85ff75 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -535,7 +535,6 @@ export class RxQueryBase< // no cache backend provided, do nothing return; } - if (this._persistentQueryCacheResult) { // we already restored the cache once, no need to run twice return; @@ -553,25 +552,27 @@ export class RxQueryBase< console.log(`no persistent query cache found in the backend, returning early ${this.toString()}`); return; } + // eslint-disable-next-line no-console console.time(`Restoring persistent querycache ${this.toString()}`); + const primaryPath = this.collection.schema.primaryPath; const results = await Promise.all([ this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`), this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lb`), ]); - const [lwt, limitBufferIds] = [results[0] as string | null, results[1] as string[] | null]; + const lwt = results[0] as string | null; + const limitBufferIds = new Set(results[1] ?? []); - const primaryPath = this.collection.schema.primaryPath; + if (!lwt) { + return; + } - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; + this._persistentQueryCacheResult = value; + this._persistentQueryCacheResultLwt = lwt; const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult); - let docsData: RxDocumentData[] = []; - const changedDocIds: Set = new Set(); - // query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library) const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince( 1_000_000, @@ -580,72 +581,24 @@ export class RxQueryBase< {id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT} ); - for (const changedDoc of changedDocs) { - const docWasInOldPersistedResults = persistedQueryCacheIds.has(changedDoc[primaryPath] as string); - const docMatchesNow = this.doesDocumentDataMatch(changedDoc); + const changedDocIds = new Set(changedDocs.map((d) => d[primaryPath] as string)); - if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit && !limitBufferIds?.length) { - // Unfortunately if any doc was removed from the results since the last result, - // there is no way for us to be sure our calculated results are correct. - // So we should simply give up and re-exec the query. - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; - return; - } - - if (docWasInOldPersistedResults) { - /* - * no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter - * out docs that are no longer matching the query. - */ - persistedQueryCacheIds.delete(changedDoc[primaryPath] as string); - } - - // ignore deleted docs or docs that do not match the query - if (!docMatchesNow) { - continue; - } - - // add to document cache - this.collection._docCache.getCachedRxDocument(changedDoc); - - // add to docs - docsData.push(changedDoc); - changedDocIds.add(changedDoc[primaryPath] as string); - } - - // Get the rest of the doc ids we need to consider: - const moreDocIdsToConsider = new Set(Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? [])); - - const nonRestoredDocIds: string[] = []; - for (const docId of moreDocIdsToConsider) { - if (changedDocIds.has(docId)) { - // we already fetched this doc because it changed, don't get it again - continue; - } + const docIdsWeNeedToFetch = [...persistedQueryCacheIds, ...limitBufferIds].filter((id) => !changedDocIds.has(id)); - // first try to fill from docCache - const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId); - if (docData && this.doesDocumentDataMatch(docData)) { - docsData.push(docData); - } + // We use _queryCollectionByIds to fetch the remaining docs we need efficiently, pulling + // from query cache if we can (and the storageInstance by ids if we can't): + const otherPotentialMatchingDocs: RxDocumentData[] = []; + await _queryCollectionByIds(this as any, otherPotentialMatchingDocs, docIdsWeNeedToFetch); - if (!docData) { - nonRestoredDocIds.push(docId); + // Now that we have all potential documents, we just filter (in-memory) the ones that still match our query: + let docsData: RxDocumentData[] = []; + for (const doc of changedDocs.concat(otherPotentialMatchingDocs)) { + if (this.doesDocumentDataMatch(doc)) { + docsData.push(doc); } } - // otherwise get from storage - if (nonRestoredDocIds.length > 0) { - const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false); - Object.values(docsMap).forEach(docData => { - if (this.doesDocumentDataMatch(docData)) { - this.collection._docCache.getCachedRxDocument(docData); - docsData.push(docData); - } - }); - } - + // Sort the documents by the query's sort field: const normalizedMangoQuery = normalizeMangoQuery( this.collection.schema.jsonSchema, this.mangoQuery @@ -654,25 +607,36 @@ export class RxQueryBase< const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity; docsData = docsData.sort(sortComparator); + // We know for sure that all persisted and limit buffer ids (and changed docs before them) are in the correct + // result set. And we can't be sure about any past that point. So cut it off there: + const lastValidIndex = docsData.findLastIndex((d) => limitBufferIds.has(d[primaryPath] as string) || persistedQueryCacheIds.has(d[primaryPath] as string)); + docsData = docsData.slice(0, lastValidIndex + 1); + + // Now this is the trickiest part. + // If we somehow have fewer docs than the limit of our query + // (and this wasn't the case because before persistence) + // then there is no way for us to know the correct results, and we re-exec: + const unchangedItemsMayNowBeInResults = ( + this.mangoQuery.limit && + docsData.length < this.mangoQuery.limit && + persistedQueryCacheIds.size >= this.mangoQuery.limit + ); + if (unchangedItemsMayNowBeInResults) { + this._persistentQueryCacheResult = undefined; + this._persistentQueryCacheResultLwt = undefined; + return; + } + + // Our finalResults are the actual results of this query, and pastLimitItems are any remaining matching + // documents we have left over (past the limit). const pastLimitItems = docsData.slice(limit); const finalResults = docsData.slice(0, limit); - // If we had a limit buffer before, and now we don't... it means the limit buffer was exhausted. - // To be confident we're not missing anything, we need to re-exec the query: - if (limitBufferIds?.length && pastLimitItems.length === 0) { - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; - return; - } // If there are still items past the first LIMIT items, try to restore the limit buffer with them: - if (limitBufferIds?.length && pastLimitItems.length > 0) { - const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string)); - if (lastLimitBufferIndex !== -1){ - // If the limit buffer still has room, simply restore it: - this._limitBufferResults = pastLimitItems.slice(0, Math.max(lastLimitBufferIndex + 1, this._limitBufferSize ?? 0)); - } else { - this._limitBufferResults = []; - } + if (limitBufferIds.size && pastLimitItems.length > 0) { + this._limitBufferResults = pastLimitItems; + } else { + this._limitBufferResults = []; } // get query into the correct state @@ -682,7 +646,6 @@ export class RxQueryBase< // eslint-disable-next-line no-console console.timeEnd(`Restoring persistent querycache ${this.toString()}`); - } } diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index 8854f0993ef..5e2265330ea 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1769,7 +1769,7 @@ describe('rx-query.test.ts', () => { const result2 = await query2.exec(); assert.strictEqual(query1._execOverDatabaseCount, 0); - assert.strictEqual(query2._execOverDatabaseCount, 1); + assert.strictEqual(query2._execOverDatabaseCount, 0); assert.deepStrictEqual(result2.map(item => item.passportId), ['1', '3']); collection.database.destroy(); @@ -1844,7 +1844,7 @@ describe('rx-query.test.ts', () => { // wait 1 second so that not all docs are included in lwt await new Promise((resolve) => { - setTimeout(resolve, 1000); + setTimeout(resolve, 500); }); // Cache a limited query: @@ -2016,16 +2016,17 @@ describe('rx-query.test.ts', () => { assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '4']); assert.strictEqual(queryAgain._limitBufferResults?.length, 1); - // But if we go further, and use the last item from the limit buffer, we'll have to re-exec: + // But if we go further, and use the last items from the limit buffer, we'll have to re-exec: uncacheRxQuery(collection._queryCache, queryAgain); await collection.find({ selector: { passportId: '4' } }).remove(); + await collection.find({ selector: { passportId: '5' } }).remove(); const queryFinal = collection.find(query.mangoQuery); queryFinal.enableLimitBuffer(3).enablePersistentQueryCache(cache); const finalResults = await queryFinal.exec(); assert.strictEqual(queryFinal._execOverDatabaseCount, 1); - assert.deepStrictEqual(finalResults.map(h => h.passportId), ['2', '5']); + assert.deepStrictEqual(finalResults.map(h => h.passportId), ['2']); collection.database.destroy(); }); @@ -2107,6 +2108,62 @@ describe('rx-query.test.ts', () => { // how they would be sorted relative to other documents assert.strictEqual(queryAgain._limitBufferResults?.length, 0); + simulateNewSession(collection); + + // If one more doc is removed from our results, we will HAVE to re-exec to ensure + // correct results, test that: + await collection.find({ selector: { passportId: '3' } }).update({ + $set: { age: 1 } + }); + + const queryFinal = collection.find(query.mangoQuery); + queryFinal.enableLimitBuffer(2).enablePersistentQueryCache(cache); + + const finalResults = await queryFinal.exec(); + + // Query re-execs, and gives correct results: + assert.strictEqual(queryFinal._execOverDatabaseCount, 1); + assert.deepStrictEqual(finalResults.map(h => h.passportId), ['4', '5']); + + // When we re-exec, the limit buffer will also get filled: + assert.deepStrictEqual(queryFinal._limitBufferResults?.map(h => h.passportId), ['6', '7']); + + collection.database.destroy(); + }); + + it('Handles case where we have fewer than LIMIT matches', async () => { + const { collection, cache } = await setUpLimitBufferSituation(); + + const query = collection.find({ limit: 3, sort: [{age: 'asc'}], selector: { age: { $lt: 45 } } }); + query.enableLimitBuffer(2).enablePersistentQueryCache(cache); + await query.exec(); + simulateNewSession(collection); + + // Remove something, still correct and no-re-exec + await collection.find({ selector: { passportId: '1' } }).remove(); + + const queryRemoved = collection.find(query.mangoQuery); + queryRemoved.enableLimitBuffer(2).enablePersistentQueryCache(cache); + const removedResults = await queryRemoved.exec(); + assert.strictEqual(queryRemoved._execOverDatabaseCount, 0); + assert.deepStrictEqual(removedResults.map(h => h.passportId), ['2']); + + simulateNewSession(collection); + + // Now add some matching docs. Since they change, they should now be in results with no re-exec. + await collection.find({ selector: { passportId: '5' } }).update({ + $set: { age: 1 } + }); + await collection.bulkUpsert([ + schemaObjects.human('6', 2), + schemaObjects.human('7', 3), + ]); + const queryAdded = collection.find(query.mangoQuery); + queryAdded.enableLimitBuffer(2).enablePersistentQueryCache(cache); + const addedResults = await queryRemoved.exec(); + assert.strictEqual(queryAdded._execOverDatabaseCount, 0); + assert.deepStrictEqual(addedResults.map(h => h.passportId), ['5', '6', '7']); + collection.database.destroy(); }); });