From 20da204c23a87c96a445f3174fc85cb05e4932a6 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Thu, 7 Dec 2023 01:04:48 -0500 Subject: [PATCH 01/11] persist limit buffers mvp --- src/rx-query.ts | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index c1ff6b185c6..eece164b686 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -555,7 +555,13 @@ export class RxQueryBase< } // eslint-disable-next-line no-console console.time(`Restoring persistent querycache ${this.toString()}`); - const lwt = (await this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`)) as string | null; + + const results = await Promise.all([ + this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`), + this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lb`), + ]); + const [lwt, limitBufferIds] = [results[0] as string | null, results[1] as string[] | null]; + const primaryPath = this.collection.schema.primaryPath; this._persistentQueryCacheResult = value ?? undefined; @@ -610,7 +616,9 @@ export class RxQueryBase< // fetch remaining persisted doc ids const nonRestoredDocIds: string[] = []; - for (const docId of persistedQueryCacheIds) { + + const moreDocIdsToConsider = Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? []); + for (const docId of moreDocIdsToConsider) { // first try to fill from docCache const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId); if (docData && this.doesDocumentDataMatch(docData)) { @@ -636,17 +644,24 @@ export class RxQueryBase< this.mangoQuery ); const sortComparator = getSortComparator(this.collection.schema.jsonSchema, normalizedMangoQuery); - const skip = normalizedMangoQuery.skip ? normalizedMangoQuery.skip : 0; const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity; - const skipPlusLimit = skip + limit; docsData = docsData.sort(sortComparator); - docsData = docsData.slice(skip, skipPlusLimit); + const pastLimitItems = docsData.slice(limit); + const finalResults = docsData.slice(0, limit); + + // Restore the limit buffer, if we can: + if (pastLimitItems && limitBufferIds?.length) { + const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string)); + if (lastLimitBufferIndex > 0) { + this._limitBufferResults = pastLimitItems.slice(0, lastLimitBufferIndex + 1); + } + } // get query into the correct state this._lastEnsureEqual = now(); this._latestChangeEvent = this.collection._changeEventBuffer.counter; - this._setResultData(docsData); + this._setResultData(finalResults); } // eslint-disable-next-line no-console console.timeEnd(`Restoring persistent querycache ${this.toString()}`); @@ -889,8 +904,18 @@ async function updatePersistentQueryCache(rxQuery: RxQueryBase d[rxQuery.collection.schema.primaryPath] as string); + promises.push(backend.setItem(`qc:${String(key)}:lb`, limitBufferIds)); + } + + await Promise.all(promises); // eslint-disable-next-line no-console console.timeEnd(`Query persistence: persisting results of ${JSON.stringify(rxQuery.mangoQuery)}`); From 0473da088e9ff9c2b67498c3c61986e435dfebb8 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Fri, 8 Dec 2023 02:51:53 -0500 Subject: [PATCH 02/11] add tests for persisted limit buffer, and improve the logic a lot to handle all edge cases --- src/rx-query.ts | 35 +++--- test/unit.test.ts | 3 +- test/unit/rx-query.test.ts | 214 +++++++++++++++++++++++++++++-------- 3 files changed, 191 insertions(+), 61 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index eece164b686..6e7f44eb100 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -572,6 +572,7 @@ export class RxQueryBase< const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult); let docsData: RxDocumentData[] = []; + const changedDocIds: Set = new Set(); // query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library) const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince( @@ -585,7 +586,7 @@ export class RxQueryBase< const docWasInOldPersistedResults = persistedQueryCacheIds.has(changedDoc[primaryPath] as string); const docMatchesNow = this.doesDocumentDataMatch(changedDoc); - if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit) { + if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit && !limitBufferIds?.length) { // Unfortunately if any doc was removed from the results since the last result, // there is no way for us to be sure our calculated results are correct. // So we should simply give up and re-exec the query. @@ -594,14 +595,6 @@ export class RxQueryBase< return; } - if (docWasInOldPersistedResults) { - /* - * no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter - * deleted docs as well - */ - persistedQueryCacheIds.delete(changedDoc[primaryPath] as string); - } - // ignore deleted docs or docs that do not match the query if (!docMatchesNow) { continue; @@ -612,13 +605,19 @@ export class RxQueryBase< // add to docs docsData.push(changedDoc); + changedDocIds.add(changedDoc[primaryPath] as string); } - // fetch remaining persisted doc ids - const nonRestoredDocIds: string[] = []; + // Get the rest of the doc ids we need to consider: + const moreDocIdsToConsider = new Set(Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? [])); - const moreDocIdsToConsider = Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? []); + const nonRestoredDocIds: string[] = []; for (const docId of moreDocIdsToConsider) { + if (changedDocIds.has(docId)) { + // we already fetched this doc because it changed, don't get it again + continue; + } + // first try to fill from docCache const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId); if (docData && this.doesDocumentDataMatch(docData)) { @@ -653,8 +652,15 @@ export class RxQueryBase< // Restore the limit buffer, if we can: if (pastLimitItems && limitBufferIds?.length) { const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string)); - if (lastLimitBufferIndex > 0) { - this._limitBufferResults = pastLimitItems.slice(0, lastLimitBufferIndex + 1); + if (lastLimitBufferIndex === -1) { + // We had a limit buffer before, now we don't. This means it was an exhausted, + // and to be confident we're not missing anyhing, we need to re-exec the query: + this._persistentQueryCacheResult = value ?? undefined; + this._persistentQueryCacheResultLwt = lwt ?? undefined; + return; + } else { + // If the limit buffer still has room, simply restore it: + this._limitBufferResults = pastLimitItems.slice(0, Math.max(lastLimitBufferIndex + 1, this._limitBufferSize ?? 0)); } } @@ -909,7 +915,6 @@ async function updatePersistentQueryCache(rxQuery: RxQueryBase d[rxQuery.collection.schema.primaryPath] as string); promises.push(backend.setItem(`qc:${String(key)}:lb`, limitBufferIds)); diff --git a/test/unit.test.ts b/test/unit.test.ts index bb9a1a90b8b..69172c7c92d 100644 --- a/test/unit.test.ts +++ b/test/unit.test.ts @@ -1,6 +1,8 @@ import './unit/init.test'; import './unit/util.test'; +import './unit/rx-query.test'; + /** * Helpers that @@ -36,7 +38,6 @@ import './unit/validate.test'; import './unit/attachments.test'; import './unit/attachments-compression.test'; import './unit/encryption.test'; -import './unit/rx-query.test'; import './unit/cross-instance.test'; import './unit/local-documents.test'; import './unit/change-event-buffer.test'; diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index b8124b0d8b0..fee5339188e 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1596,7 +1596,7 @@ describe('rx-query.test.ts', () => { const query = collection.find({ limit: 1 }); const cache = new Cache(); - query.enablePersistentQueryCache(cache); + query.enableLimitBuffer(5).enablePersistentQueryCache(cache); const human1 = schemaObjects.human(); const human2 = schemaObjects.human(); @@ -1604,7 +1604,7 @@ describe('rx-query.test.ts', () => { await collection.bulkInsert([human1, human2]); await query.exec(); - assert.strictEqual(cache.size, 2); + assert.strictEqual(cache.size, 3); collection.database.destroy(); }); @@ -1624,7 +1624,7 @@ describe('rx-query.test.ts', () => { const cache = new Cache(); await cache.setItem(`qc:${queryId}`, [human1.passportId, human2.passportId]); await cache.setItem(`qc:${queryId}:lwt`, `${now()}`); - query.enablePersistentQueryCache(cache); + query.enableLimitBuffer(5).enablePersistentQueryCache(cache); // execute query const result = await query.exec(); @@ -1650,7 +1650,7 @@ describe('rx-query.test.ts', () => { const cache = new Cache(); await cache.setItem(`qc:${queryId}`, [human1.passportId]); await cache.setItem(`qc:${queryId}:lwt`, `${now()}`); - query1.enablePersistentQueryCache(cache); + query1.enableLimitBuffer(5).enablePersistentQueryCache(cache); // execute query const result1 = await query1.exec(); @@ -1665,7 +1665,7 @@ describe('rx-query.test.ts', () => { clearQueryCache(collection); const query2 = collection.find({ selector: { age: human1Age }}); - query2.enablePersistentQueryCache(cache); + query2.enableLimitBuffer(5).enablePersistentQueryCache(cache); const result2 = await query2.exec(); @@ -1691,7 +1691,7 @@ describe('rx-query.test.ts', () => { const cache = new Cache(); await cache.setItem(`qc:${queryId}`, [human1.passportId, human2.passportId]); await cache.setItem(`qc:${queryId}:lwt`, `${now()}`); - query.enablePersistentQueryCache(cache); + query.enableLimitBuffer(5).enablePersistentQueryCache(cache); const result1 = await query.exec(); @@ -1728,7 +1728,7 @@ describe('rx-query.test.ts', () => { clearQueryCache(collection); const query2 = collection.find({ limit: 2, sort: [{age: 'asc'}] }); - query2.enablePersistentQueryCache(cache); + query2.enableLimitBuffer(5).enablePersistentQueryCache(cache); const result2 = await query2.exec(); @@ -1762,7 +1762,7 @@ describe('rx-query.test.ts', () => { clearQueryCache(collection); const query2 = collection.find({ limit: 2, sort: [{age: 'asc'}] }); - query2.enablePersistentQueryCache(cache); + query2.enableLimitBuffer(5).enablePersistentQueryCache(cache); assert.strictEqual(cache.getItem(`qc:${queryId}`).length, 3); @@ -1785,7 +1785,7 @@ describe('rx-query.test.ts', () => { // fill cache const cache = new Cache(); const query1 = collection.find({limit: 1}); - query1.enablePersistentQueryCache(cache); + query1.enableLimitBuffer(5).enablePersistentQueryCache(cache); const queryId = query1.persistentQueryId(); const result1 = await query1.exec(); @@ -1799,7 +1799,7 @@ describe('rx-query.test.ts', () => { await cache.setItem(`qc:${queryId}:lwt`, `${lwt}`); const query2 = collection.find({limit: 1}); - query2.enablePersistentQueryCache(cache); + query2.enableLimitBuffer(5).enablePersistentQueryCache(cache); await query2._persistentQueryCacheLoaded; await result1[0].remove(); @@ -1821,7 +1821,7 @@ describe('rx-query.test.ts', () => { const query = collection.find({ limit: 3 }); const cache = new Cache(); - query.enablePersistentQueryCache(cache); + query.enableLimitBuffer(5).enablePersistentQueryCache(cache); const result = await query.exec(); @@ -1831,49 +1831,173 @@ describe('rx-query.test.ts', () => { collection.database.destroy(); }); - it('gives correct limit results when items were removed', async () => { - const {collection} = await setUpPersistentQueryCacheCollection(); - const human1 = schemaObjects.human('1', 30); - const human2 = schemaObjects.human('2', 40); - const human3 = schemaObjects.human('3', 50); - await collection.bulkInsert([human1, human2, human3]); + describe.only('persisting queries with limit buffers', () => { + async function setUpLimitBufferSituation() { + const {collection} = await setUpPersistentQueryCacheCollection(); + await collection.bulkInsert([ + schemaObjects.human('1', 30), + schemaObjects.human('2', 40), + schemaObjects.human('3', 50), + schemaObjects.human('4', 60), + schemaObjects.human('5', 70), + ]); + + // wait 1 second so that not all docs are included in lwt + await new Promise((resolve) => { + setTimeout(resolve, 1000); + }); - // wait 1 second so that not all docs are included in lwt - await new Promise((resolve) => { - setTimeout(resolve, 1000); - return; + // Cache a limited query: + const query = collection.find({ limit: 2, sort: [{age: 'asc'}], selector: { age: { $gt: 10 } } }); + const cache = new Cache(); + + return { query, cache, collection }; + } + + // This is how it should operate when we don't persist limit buffers: + it('limit buffer not enabled, still gives correct results through re-execution', async () => { + const { collection, query, cache} = await setUpLimitBufferSituation(); + + // persist with no limit buffer enabled + await query.enablePersistentQueryCache(cache); + const originalResults = await query.exec(); + assert.deepStrictEqual(originalResults.map(h => h.passportId), ['1', '2']); + + // Now, get into a state where that query is no longer in memory (eg new tab) + // (but, the query should still be persisted on disk) + uncacheRxQuery(collection._queryCache, query); + assert.strictEqual(cache.size, 2); + + // while the query is not in memory, remove one of the items from the query results + await collection.find({selector: { passportId: '1'}}).update({ + $set: { age: 1 } + }); + + // now when we create the query again, it has no way of knowing how to fill the missing item + const queryAgain = collection.find(query.mangoQuery); + assert.strictEqual(queryAgain._execOverDatabaseCount, 0); + + await queryAgain.enablePersistentQueryCache(cache); + const updatedResults = await queryAgain.exec(); + + // We must re-exec the query to make it correct. + assert.strictEqual(queryAgain._execOverDatabaseCount, 1); + assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '3']); + collection.database.destroy(); }); - // Cache a limited query: - const query = collection.find({ limit: 2, sort: [{age: 'asc'}], selector: { age: { $gt: 10 } } }); - const cache = new Cache(); - await query.enablePersistentQueryCache(cache); - const originalResults = await query.exec(); - assert.deepStrictEqual(originalResults.map(h => h.passportId), ['1', '2']); + it('limit buffer enabled, restores changed results correctly with no re-exec', async () => { + const { collection, query, cache} = await setUpLimitBufferSituation(); - // Now, get into a state where that query is no longer in memory (eg new tab) - // (but, the query should still be persisted on disk) - uncacheRxQuery(collection._queryCache, query); - assert.strictEqual(cache.size, 2); + // Persist WITH the limit buffer enabled + query.enableLimitBuffer(5).enablePersistentQueryCache(cache); - // while the query is not in memory, remove one of the items from the query results - await collection.find({selector: { passportId: '1'}}).update({ - $set: { - age: 1, - } + const originalResults = await query.exec(); + assert.deepStrictEqual(originalResults.map(h => h.passportId), ['1', '2']); + assert.strictEqual(query._limitBufferResults?.length, 3); + + uncacheRxQuery(collection._queryCache, query); + assert.strictEqual(cache.size, 3); + + // remove one of the items from the query results + await collection.find({ selector: { passportId: '1' } }).update({ + $set: { age: 1 } + }); + + // now when we create the query again, it will fill in the missing element from the limit buffer + const queryAgain = collection.find(query.mangoQuery); + queryAgain.enableLimitBuffer(5).enablePersistentQueryCache(cache); + + const updatedResults = await queryAgain.exec(); + + // The query should use the limit buffer to restore the results, and not need to re-exec the query + assert.strictEqual(queryAgain._execOverDatabaseCount, 0); + assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '3']); + + // There should now only be 2 items left in the limit buffer, it used the first one up to fill the results + assert.strictEqual(queryAgain._limitBufferResults?.length, 2); + + collection.database.destroy(); }); - // now when we create the query again, it has no way of knowing how to fill the missing item - const queryAgain = collection.find({ limit: 2, sort: [{age: 'asc'}], selector: { age: { $gt: 10 } } }); - assert.strictEqual(queryAgain._execOverDatabaseCount, 0); + it('limit buffer enabled, but gets exhausted', async () => { + const { collection, query, cache} = await setUpLimitBufferSituation(); - await queryAgain.enablePersistentQueryCache(cache); - const updatedResults = await queryAgain.exec(); + // Persist WITH the limit buffer enabled, but only one doc + query.enableLimitBuffer(1).enablePersistentQueryCache(cache); + await query.exec(); + uncacheRxQuery(collection._queryCache, query); - // We must re-exec the query to make it correct. - assert.strictEqual(queryAgain._execOverDatabaseCount, 1); - assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '3']); - collection.database.destroy(); + // remove two of the items from the query results + await collection.find({ selector: { passportId: '1' } }).update({ + $set: { age: 1 } + }); + await collection.find({ selector: { passportId: '2' } }).update({ + $set: { age: 1 } + }); + + // now when we create the query again, it will fill in the missing element from the limit buffer + // but then still need another item to hit the limit=2 + const queryAgain = collection.find(query.mangoQuery); + queryAgain.enableLimitBuffer(1).enablePersistentQueryCache(cache); + + const updatedResults = await queryAgain.exec(); + + // The query will have to still re-exec, but give the correct results + assert.strictEqual(queryAgain._execOverDatabaseCount, 1); + assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['3', '4']); + + // And re-fill the 1 item in limit buffer: + assert.strictEqual(queryAgain._limitBufferResults?.length, 1); + assert.strictEqual(queryAgain._limitBufferResults?.[0].passportId, '5'); + + collection.database.destroy(); + }); + + it('limit buffer enabled, doc added and limit buffer items changed, still restores correctly', async () => { + const { collection, query, cache} = await setUpLimitBufferSituation(); + + // Persist WITH the limit buffer enabled + query.enableLimitBuffer(5).enablePersistentQueryCache(cache); + + await query.exec(); + + uncacheRxQuery(collection._queryCache, query); + + // Let's make 3 changes: + // 1. remove both of the original results + // 2. add in a new doc that should now be in the results + // 3. modify one of the items in the limit buffer to change the correct order there + await collection.find({ selector: { passportId: '1' } }).update({ + $set: { age: 1 } + }); + await collection.find({ selector: { passportId: '2' } }).update({ + $set: { age: 1 } + }); + // the new item should now be the first result, since it has the lowest age + await collection.bulkUpsert([ + schemaObjects.human('6', 20), + ]); + // change what would be the next result (passport id 3) to still match the filter, but now be last (so not in the results) + await collection.find({ selector: { passportId: '3' } }).update({ + $set: { age: 100 } + }); + + const queryAgain = collection.find(query.mangoQuery); + queryAgain.enableLimitBuffer(5).enablePersistentQueryCache(cache); + const updatedResults = await queryAgain.exec(); + + // The query should use the limit buffer to restore the results, and not need to re-exec the query + assert.strictEqual(queryAgain._execOverDatabaseCount, 0); + + // But it should also correctly fill in the new document into the correct position, and also handle the sort change + assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['6', '4']); + + // The two items in limit buffer should be in the correct order: + assert.deepStrictEqual(queryAgain._limitBufferResults?.map((d) => d.passportId), ['5', '3']); + + collection.database.destroy(); + }); }); }); }); From 9fc54f30dfee62a4d80fe03843755ef6a42406f4 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Fri, 8 Dec 2023 02:53:06 -0500 Subject: [PATCH 03/11] remove .only from tests --- test/unit.test.ts | 3 +-- test/unit/rx-query.test.ts | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/test/unit.test.ts b/test/unit.test.ts index 69172c7c92d..bb9a1a90b8b 100644 --- a/test/unit.test.ts +++ b/test/unit.test.ts @@ -1,8 +1,6 @@ import './unit/init.test'; import './unit/util.test'; -import './unit/rx-query.test'; - /** * Helpers that @@ -38,6 +36,7 @@ import './unit/validate.test'; import './unit/attachments.test'; import './unit/attachments-compression.test'; import './unit/encryption.test'; +import './unit/rx-query.test'; import './unit/cross-instance.test'; import './unit/local-documents.test'; import './unit/change-event-buffer.test'; diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index fee5339188e..c11f4d4feaf 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1831,7 +1831,7 @@ describe('rx-query.test.ts', () => { collection.database.destroy(); }); - describe.only('persisting queries with limit buffers', () => { + describe('persisting queries with limit buffers', () => { async function setUpLimitBufferSituation() { const {collection} = await setUpPersistentQueryCacheCollection(); await collection.bulkInsert([ From 2ab2bdb3c9ed6fddbcdfbe130ac7349dae698863 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Fri, 8 Dec 2023 03:39:19 -0500 Subject: [PATCH 04/11] add yet another edge case test and catch it in the logic --- src/rx-query.ts | 21 ++++++----- test/unit/rx-query.test.ts | 71 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 9 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 6e7f44eb100..126a99a33aa 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -649,18 +649,21 @@ export class RxQueryBase< const pastLimitItems = docsData.slice(limit); const finalResults = docsData.slice(0, limit); - // Restore the limit buffer, if we can: - if (pastLimitItems && limitBufferIds?.length) { + // If we had a limit buffer before, and now we don't... it means the limit buffer was exhausted. + // To be confident we're not missing anything, we need to re-exec the query: + if (limitBufferIds?.length && pastLimitItems.length === 0) { + this._persistentQueryCacheResult = value ?? undefined; + this._persistentQueryCacheResultLwt = lwt ?? undefined; + return; + } + // If there are still items past the first LIMIT items, try to restore the limit buffer with them: + if (limitBufferIds?.length && pastLimitItems.length > 0) { const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string)); - if (lastLimitBufferIndex === -1) { - // We had a limit buffer before, now we don't. This means it was an exhausted, - // and to be confident we're not missing anyhing, we need to re-exec the query: - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; - return; - } else { + if (lastLimitBufferIndex !== -1){ // If the limit buffer still has room, simply restore it: this._limitBufferResults = pastLimitItems.slice(0, Math.max(lastLimitBufferIndex + 1, this._limitBufferSize ?? 0)); + } else { + this._limitBufferResults = []; } } diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index c11f4d4feaf..acd81c7f911 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1954,6 +1954,42 @@ describe('rx-query.test.ts', () => { collection.database.destroy(); }); + it('limit buffer enabled, with a bunch of deletions', async () => { + const { collection, query, cache} = await setUpLimitBufferSituation(); + + // Persist WITH the limit buffer enabled + query.enableLimitBuffer(3).enablePersistentQueryCache(cache); + await query.exec(); + uncacheRxQuery(collection._queryCache, query); + + // delete one item from the results, and one item from the limit buffer: + await collection.find({ selector: { passportId: '1' } }).remove(); + await collection.find({ selector: { passportId: '3' } }).remove(); + + const queryAgain = collection.find(query.mangoQuery); + queryAgain.enableLimitBuffer(3).enablePersistentQueryCache(cache); + + const updatedResults = await queryAgain.exec(); + + // The query should be able to fill up from the limit buffer + assert.strictEqual(queryAgain._execOverDatabaseCount, 0); + assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '4']); + assert.strictEqual(queryAgain._limitBufferResults?.length, 1); + + // But if we go further, and use the last item from the limit buffer, we'll have to re-exec: + uncacheRxQuery(collection._queryCache, queryAgain); + await collection.find({ selector: { passportId: '4' } }).remove(); + + const queryFinal = collection.find(query.mangoQuery); + queryFinal.enableLimitBuffer(3).enablePersistentQueryCache(cache); + + const finalResults = await queryFinal.exec(); + assert.strictEqual(queryFinal._execOverDatabaseCount, 1); + assert.deepStrictEqual(finalResults.map(h => h.passportId), ['2', '5']); + + collection.database.destroy(); + }); + it('limit buffer enabled, doc added and limit buffer items changed, still restores correctly', async () => { const { collection, query, cache} = await setUpLimitBufferSituation(); @@ -1998,6 +2034,41 @@ describe('rx-query.test.ts', () => { collection.database.destroy(); }); + + it('limit buffer enabled, all items in buffer used but we have more non-buffer items', async () => { + const { collection, query, cache} = await setUpLimitBufferSituation(); + + // Persist WITH the limit buffer enabled + query.enableLimitBuffer(2).enablePersistentQueryCache(cache); + await query.exec(); + uncacheRxQuery(collection._queryCache, query); + + // remove the 2 results, so we use up the 2 items in the limit buffer: + await collection.find({ selector: { passportId: '1' } }).remove(); + await collection.find({ selector: { passportId: '2' } }).update({ + $set: { age: 1 } + }); + // But also add in some new docs, that match the filter but are sorted last + await collection.bulkUpsert([ + schemaObjects.human('6', 90), + schemaObjects.human('7', 90), + ]); + + const queryAgain = collection.find(query.mangoQuery); + queryAgain.enableLimitBuffer(2).enablePersistentQueryCache(cache); + + const updatedResults = await queryAgain.exec(); + + // In this case we can use the limit buffer without re-execing, and still get correct results: + assert.strictEqual(queryAgain._execOverDatabaseCount, 0); + assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['3', '4']); + + // But the new limit buffer will be empty -- we can't use the new documents because we don't know + // how they would be sorted relative to other documents + assert.strictEqual(queryAgain._limitBufferResults?.length, 0); + + collection.database.destroy(); + }); }); }); }); From 058e9c05225ab7591c15efad613f1aa43b281b48 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Fri, 8 Dec 2023 04:19:56 -0500 Subject: [PATCH 05/11] catch another edge case --- src/rx-query.ts | 20 +++++++++---- test/unit/rx-query.test.ts | 60 +++++++++++++++++++++++++++++++------- 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 126a99a33aa..152fe1aca5b 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -595,6 +595,14 @@ export class RxQueryBase< return; } + if (docWasInOldPersistedResults) { + /* + * no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter + * out docs that are no longer matching the query. + */ + persistedQueryCacheIds.delete(changedDoc[primaryPath] as string); + } + // ignore deleted docs or docs that do not match the query if (!docMatchesNow) { continue; @@ -631,11 +639,13 @@ export class RxQueryBase< // otherwise get from storage if (nonRestoredDocIds.length > 0) { - const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false); - Object.values(docsMap).forEach(docData => { - this.collection._docCache.getCachedRxDocument(docData); - docsData.push(docData); - }); + const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false); + Object.values(docsMap).forEach(docData => { + if (this.doesDocumentDataMatch(docData)) { + this.collection._docCache.getCachedRxDocument(docData); + docsData.push(docData); + } + }); } const normalizedMangoQuery = normalizeMangoQuery( diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index acd81c7f911..8854f0993ef 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -15,7 +15,7 @@ import { promiseWait, randomCouchString, ensureNotFalsy, - now, uncacheRxQuery, + now, uncacheRxQuery, RxCollection, } from '../../'; import { firstValueFrom } from 'rxjs'; @@ -1854,6 +1854,11 @@ describe('rx-query.test.ts', () => { return { query, cache, collection }; } + function simulateNewSession(collection: RxCollection) { + clearQueryCache(collection); + collection._docCache.cacheItemByDocId.clear(); + } + // This is how it should operate when we don't persist limit buffers: it('limit buffer not enabled, still gives correct results through re-execution', async () => { const { collection, query, cache} = await setUpLimitBufferSituation(); @@ -1865,7 +1870,7 @@ describe('rx-query.test.ts', () => { // Now, get into a state where that query is no longer in memory (eg new tab) // (but, the query should still be persisted on disk) - uncacheRxQuery(collection._queryCache, query); + simulateNewSession(collection); assert.strictEqual(cache.size, 2); // while the query is not in memory, remove one of the items from the query results @@ -1886,7 +1891,7 @@ describe('rx-query.test.ts', () => { collection.database.destroy(); }); - it('limit buffer enabled, restores changed results correctly with no re-exec', async () => { + it('limit buffer enabled, restores normal changes, results correctly with no re-exec', async () => { const { collection, query, cache} = await setUpLimitBufferSituation(); // Persist WITH the limit buffer enabled @@ -1895,8 +1900,6 @@ describe('rx-query.test.ts', () => { const originalResults = await query.exec(); assert.deepStrictEqual(originalResults.map(h => h.passportId), ['1', '2']); assert.strictEqual(query._limitBufferResults?.length, 3); - - uncacheRxQuery(collection._queryCache, query); assert.strictEqual(cache.size, 3); // remove one of the items from the query results @@ -1904,6 +1907,43 @@ describe('rx-query.test.ts', () => { $set: { age: 1 } }); + simulateNewSession(collection); + + // now when we create the query again, it should fill in the missing element from the limit buffer + const queryAgain = collection.find(query.mangoQuery); + queryAgain.enableLimitBuffer(5).enablePersistentQueryCache(cache); + + const updatedResults = await queryAgain.exec(); + + // The query should use the limit buffer to restore the results, and not need to re-exec the query + assert.strictEqual(queryAgain._execOverDatabaseCount, 0); + assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '3']); + + // There should now only be 2 items left in the limit buffer, it used the first one up to fill the results + assert.strictEqual(queryAgain._limitBufferResults?.length, 2); + + collection.database.destroy(); + }); + + it('limit buffer enabled, restores missing changes, results correctly with no re-exec', async () => { + const { collection, query, cache} = await setUpLimitBufferSituation(); + + // Persist WITH the limit buffer enabled + query.enableLimitBuffer(5).enablePersistentQueryCache(cache); + + const originalResults = await query.exec(); + assert.deepStrictEqual(originalResults.map(h => h.passportId), ['1', '2']); + assert.strictEqual(query._limitBufferResults?.length, 3); + + // uncache the query first, before changes are made + simulateNewSession(collection); + assert.strictEqual(cache.size, 3); + + // remove one of the items from the query results while query is not listening in memory + await collection.find({ selector: { passportId: '1' } }).update({ + $set: { age: 1 } + }); + // now when we create the query again, it will fill in the missing element from the limit buffer const queryAgain = collection.find(query.mangoQuery); queryAgain.enableLimitBuffer(5).enablePersistentQueryCache(cache); @@ -1926,7 +1966,7 @@ describe('rx-query.test.ts', () => { // Persist WITH the limit buffer enabled, but only one doc query.enableLimitBuffer(1).enablePersistentQueryCache(cache); await query.exec(); - uncacheRxQuery(collection._queryCache, query); + simulateNewSession(collection); // remove two of the items from the query results await collection.find({ selector: { passportId: '1' } }).update({ @@ -1960,7 +2000,7 @@ describe('rx-query.test.ts', () => { // Persist WITH the limit buffer enabled query.enableLimitBuffer(3).enablePersistentQueryCache(cache); await query.exec(); - uncacheRxQuery(collection._queryCache, query); + simulateNewSession(collection); // delete one item from the results, and one item from the limit buffer: await collection.find({ selector: { passportId: '1' } }).remove(); @@ -1998,7 +2038,7 @@ describe('rx-query.test.ts', () => { await query.exec(); - uncacheRxQuery(collection._queryCache, query); + simulateNewSession(collection); // Let's make 3 changes: // 1. remove both of the original results @@ -2035,13 +2075,13 @@ describe('rx-query.test.ts', () => { collection.database.destroy(); }); - it('limit buffer enabled, all items in buffer used but we have more non-buffer items', async () => { + it('limit buffer enabled, all items in buffer used but we have more matching non-buffer items', async () => { const { collection, query, cache} = await setUpLimitBufferSituation(); // Persist WITH the limit buffer enabled query.enableLimitBuffer(2).enablePersistentQueryCache(cache); await query.exec(); - uncacheRxQuery(collection._queryCache, query); + simulateNewSession(collection); // remove the 2 results, so we use up the 2 items in the limit buffer: await collection.find({ selector: { passportId: '1' } }).remove(); From 4d0b93628a7b21e5d8db0858cc4c0d8db8b3d9e1 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Sat, 9 Dec 2023 20:42:21 -0500 Subject: [PATCH 06/11] move check for querycache is array up a level, simplifying --- src/rx-query.ts | 202 ++++++++++++++++++++++++------------------------ 1 file changed, 100 insertions(+), 102 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 152fe1aca5b..59a8588aa1e 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -548,7 +548,7 @@ export class RxQueryBase< const persistentQueryId = this.persistentQueryId(); const value = await this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}`); - if (!value) { + if (!value || !Array.isArray(value) || value.length === 0) { // eslint-disable-next-line no-console console.log(`no persistent query cache found in the backend, returning early ${this.toString()}`); return; @@ -567,121 +567,119 @@ export class RxQueryBase< this._persistentQueryCacheResult = value ?? undefined; this._persistentQueryCacheResultLwt = lwt ?? undefined; - // if this is a regular query, also load documents into cache - if (Array.isArray(value) && value.length > 0) { - const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult); + const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult); - let docsData: RxDocumentData[] = []; - const changedDocIds: Set = new Set(); + let docsData: RxDocumentData[] = []; + const changedDocIds: Set = new Set(); - // query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library) - const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince( - 1_000_000, - // make sure we remove the monotonic clock (xxx.01, xxx.02) from the lwt timestamp to avoid issues with - // lookups in indices (dexie) - {id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT} - ); - - for (const changedDoc of changedDocs) { - const docWasInOldPersistedResults = persistedQueryCacheIds.has(changedDoc[primaryPath] as string); - const docMatchesNow = this.doesDocumentDataMatch(changedDoc); - - if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit && !limitBufferIds?.length) { - // Unfortunately if any doc was removed from the results since the last result, - // there is no way for us to be sure our calculated results are correct. - // So we should simply give up and re-exec the query. - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; - return; - } - - if (docWasInOldPersistedResults) { - /* - * no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter - * out docs that are no longer matching the query. - */ - persistedQueryCacheIds.delete(changedDoc[primaryPath] as string); - } - - // ignore deleted docs or docs that do not match the query - if (!docMatchesNow) { - continue; - } - - // add to document cache - this.collection._docCache.getCachedRxDocument(changedDoc); - - // add to docs - docsData.push(changedDoc); - changedDocIds.add(changedDoc[primaryPath] as string); - } + // query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library) + const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince( + 1_000_000, + // make sure we remove the monotonic clock (xxx.01, xxx.02) from the lwt timestamp to avoid issues with + // lookups in indices (dexie) + {id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT} + ); - // Get the rest of the doc ids we need to consider: - const moreDocIdsToConsider = new Set(Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? [])); + for (const changedDoc of changedDocs) { + const docWasInOldPersistedResults = persistedQueryCacheIds.has(changedDoc[primaryPath] as string); + const docMatchesNow = this.doesDocumentDataMatch(changedDoc); - const nonRestoredDocIds: string[] = []; - for (const docId of moreDocIdsToConsider) { - if (changedDocIds.has(docId)) { - // we already fetched this doc because it changed, don't get it again - continue; - } + if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit && !limitBufferIds?.length) { + // Unfortunately if any doc was removed from the results since the last result, + // there is no way for us to be sure our calculated results are correct. + // So we should simply give up and re-exec the query. + this._persistentQueryCacheResult = value ?? undefined; + this._persistentQueryCacheResultLwt = lwt ?? undefined; + return; + } + + if (docWasInOldPersistedResults) { + /* + * no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter + * out docs that are no longer matching the query. + */ + persistedQueryCacheIds.delete(changedDoc[primaryPath] as string); + } + + // ignore deleted docs or docs that do not match the query + if (!docMatchesNow) { + continue; + } + + // add to document cache + this.collection._docCache.getCachedRxDocument(changedDoc); + + // add to docs + docsData.push(changedDoc); + changedDocIds.add(changedDoc[primaryPath] as string); + } - // first try to fill from docCache - const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId); - if (docData && this.doesDocumentDataMatch(docData)) { - docsData.push(docData); - } + // Get the rest of the doc ids we need to consider: + const moreDocIdsToConsider = new Set(Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? [])); - if (!docData) { - nonRestoredDocIds.push(docId); - } + const nonRestoredDocIds: string[] = []; + for (const docId of moreDocIdsToConsider) { + if (changedDocIds.has(docId)) { + // we already fetched this doc because it changed, don't get it again + continue; } - // otherwise get from storage - if (nonRestoredDocIds.length > 0) { - const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false); - Object.values(docsMap).forEach(docData => { - if (this.doesDocumentDataMatch(docData)) { - this.collection._docCache.getCachedRxDocument(docData); - docsData.push(docData); - } - }); + // first try to fill from docCache + const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId); + if (docData && this.doesDocumentDataMatch(docData)) { + docsData.push(docData); } - const normalizedMangoQuery = normalizeMangoQuery( - this.collection.schema.jsonSchema, - this.mangoQuery - ); - const sortComparator = getSortComparator(this.collection.schema.jsonSchema, normalizedMangoQuery); - const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity; - docsData = docsData.sort(sortComparator); - - const pastLimitItems = docsData.slice(limit); - const finalResults = docsData.slice(0, limit); - - // If we had a limit buffer before, and now we don't... it means the limit buffer was exhausted. - // To be confident we're not missing anything, we need to re-exec the query: - if (limitBufferIds?.length && pastLimitItems.length === 0) { - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; - return; + if (!docData) { + nonRestoredDocIds.push(docId); } - // If there are still items past the first LIMIT items, try to restore the limit buffer with them: - if (limitBufferIds?.length && pastLimitItems.length > 0) { - const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string)); - if (lastLimitBufferIndex !== -1){ - // If the limit buffer still has room, simply restore it: - this._limitBufferResults = pastLimitItems.slice(0, Math.max(lastLimitBufferIndex + 1, this._limitBufferSize ?? 0)); - } else { - this._limitBufferResults = []; + } + + // otherwise get from storage + if (nonRestoredDocIds.length > 0) { + const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false); + Object.values(docsMap).forEach(docData => { + if (this.doesDocumentDataMatch(docData)) { + this.collection._docCache.getCachedRxDocument(docData); + docsData.push(docData); } - } + }); + } - // get query into the correct state - this._lastEnsureEqual = now(); - this._latestChangeEvent = this.collection._changeEventBuffer.counter; - this._setResultData(finalResults); + const normalizedMangoQuery = normalizeMangoQuery( + this.collection.schema.jsonSchema, + this.mangoQuery + ); + const sortComparator = getSortComparator(this.collection.schema.jsonSchema, normalizedMangoQuery); + const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity; + docsData = docsData.sort(sortComparator); + + const pastLimitItems = docsData.slice(limit); + const finalResults = docsData.slice(0, limit); + + // If we had a limit buffer before, and now we don't... it means the limit buffer was exhausted. + // To be confident we're not missing anything, we need to re-exec the query: + if (limitBufferIds?.length && pastLimitItems.length === 0) { + this._persistentQueryCacheResult = value ?? undefined; + this._persistentQueryCacheResultLwt = lwt ?? undefined; + return; } + // If there are still items past the first LIMIT items, try to restore the limit buffer with them: + if (limitBufferIds?.length && pastLimitItems.length > 0) { + const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string)); + if (lastLimitBufferIndex !== -1){ + // If the limit buffer still has room, simply restore it: + this._limitBufferResults = pastLimitItems.slice(0, Math.max(lastLimitBufferIndex + 1, this._limitBufferSize ?? 0)); + } else { + this._limitBufferResults = []; + } + } + + // get query into the correct state + this._lastEnsureEqual = now(); + this._latestChangeEvent = this.collection._changeEventBuffer.counter; + this._setResultData(finalResults); + // eslint-disable-next-line no-console console.timeEnd(`Restoring persistent querycache ${this.toString()}`); From e7f46f6760f0ea707d32eac72b323d92feaff71f Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Sun, 10 Dec 2023 15:04:50 -0500 Subject: [PATCH 07/11] serious refactor to dramatically simplify (and make more efficient + correct) the restoration logic. and more tests for it. --- src/rx-query.ts | 131 +++++++++++++------------------------ test/unit/rx-query.test.ts | 65 ++++++++++++++++-- 2 files changed, 108 insertions(+), 88 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 59a8588aa1e..698bf85ff75 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -535,7 +535,6 @@ export class RxQueryBase< // no cache backend provided, do nothing return; } - if (this._persistentQueryCacheResult) { // we already restored the cache once, no need to run twice return; @@ -553,25 +552,27 @@ export class RxQueryBase< console.log(`no persistent query cache found in the backend, returning early ${this.toString()}`); return; } + // eslint-disable-next-line no-console console.time(`Restoring persistent querycache ${this.toString()}`); + const primaryPath = this.collection.schema.primaryPath; const results = await Promise.all([ this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`), this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lb`), ]); - const [lwt, limitBufferIds] = [results[0] as string | null, results[1] as string[] | null]; + const lwt = results[0] as string | null; + const limitBufferIds = new Set(results[1] ?? []); - const primaryPath = this.collection.schema.primaryPath; + if (!lwt) { + return; + } - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; + this._persistentQueryCacheResult = value; + this._persistentQueryCacheResultLwt = lwt; const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult); - let docsData: RxDocumentData[] = []; - const changedDocIds: Set = new Set(); - // query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library) const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince( 1_000_000, @@ -580,72 +581,24 @@ export class RxQueryBase< {id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT} ); - for (const changedDoc of changedDocs) { - const docWasInOldPersistedResults = persistedQueryCacheIds.has(changedDoc[primaryPath] as string); - const docMatchesNow = this.doesDocumentDataMatch(changedDoc); + const changedDocIds = new Set(changedDocs.map((d) => d[primaryPath] as string)); - if (docWasInOldPersistedResults && !docMatchesNow && this.mangoQuery.limit && !limitBufferIds?.length) { - // Unfortunately if any doc was removed from the results since the last result, - // there is no way for us to be sure our calculated results are correct. - // So we should simply give up and re-exec the query. - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; - return; - } - - if (docWasInOldPersistedResults) { - /* - * no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter - * out docs that are no longer matching the query. - */ - persistedQueryCacheIds.delete(changedDoc[primaryPath] as string); - } - - // ignore deleted docs or docs that do not match the query - if (!docMatchesNow) { - continue; - } - - // add to document cache - this.collection._docCache.getCachedRxDocument(changedDoc); - - // add to docs - docsData.push(changedDoc); - changedDocIds.add(changedDoc[primaryPath] as string); - } - - // Get the rest of the doc ids we need to consider: - const moreDocIdsToConsider = new Set(Array.from(persistedQueryCacheIds).concat(limitBufferIds ?? [])); - - const nonRestoredDocIds: string[] = []; - for (const docId of moreDocIdsToConsider) { - if (changedDocIds.has(docId)) { - // we already fetched this doc because it changed, don't get it again - continue; - } + const docIdsWeNeedToFetch = [...persistedQueryCacheIds, ...limitBufferIds].filter((id) => !changedDocIds.has(id)); - // first try to fill from docCache - const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId); - if (docData && this.doesDocumentDataMatch(docData)) { - docsData.push(docData); - } + // We use _queryCollectionByIds to fetch the remaining docs we need efficiently, pulling + // from query cache if we can (and the storageInstance by ids if we can't): + const otherPotentialMatchingDocs: RxDocumentData[] = []; + await _queryCollectionByIds(this as any, otherPotentialMatchingDocs, docIdsWeNeedToFetch); - if (!docData) { - nonRestoredDocIds.push(docId); + // Now that we have all potential documents, we just filter (in-memory) the ones that still match our query: + let docsData: RxDocumentData[] = []; + for (const doc of changedDocs.concat(otherPotentialMatchingDocs)) { + if (this.doesDocumentDataMatch(doc)) { + docsData.push(doc); } } - // otherwise get from storage - if (nonRestoredDocIds.length > 0) { - const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false); - Object.values(docsMap).forEach(docData => { - if (this.doesDocumentDataMatch(docData)) { - this.collection._docCache.getCachedRxDocument(docData); - docsData.push(docData); - } - }); - } - + // Sort the documents by the query's sort field: const normalizedMangoQuery = normalizeMangoQuery( this.collection.schema.jsonSchema, this.mangoQuery @@ -654,25 +607,36 @@ export class RxQueryBase< const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity; docsData = docsData.sort(sortComparator); + // We know for sure that all persisted and limit buffer ids (and changed docs before them) are in the correct + // result set. And we can't be sure about any past that point. So cut it off there: + const lastValidIndex = docsData.findLastIndex((d) => limitBufferIds.has(d[primaryPath] as string) || persistedQueryCacheIds.has(d[primaryPath] as string)); + docsData = docsData.slice(0, lastValidIndex + 1); + + // Now this is the trickiest part. + // If we somehow have fewer docs than the limit of our query + // (and this wasn't the case because before persistence) + // then there is no way for us to know the correct results, and we re-exec: + const unchangedItemsMayNowBeInResults = ( + this.mangoQuery.limit && + docsData.length < this.mangoQuery.limit && + persistedQueryCacheIds.size >= this.mangoQuery.limit + ); + if (unchangedItemsMayNowBeInResults) { + this._persistentQueryCacheResult = undefined; + this._persistentQueryCacheResultLwt = undefined; + return; + } + + // Our finalResults are the actual results of this query, and pastLimitItems are any remaining matching + // documents we have left over (past the limit). const pastLimitItems = docsData.slice(limit); const finalResults = docsData.slice(0, limit); - // If we had a limit buffer before, and now we don't... it means the limit buffer was exhausted. - // To be confident we're not missing anything, we need to re-exec the query: - if (limitBufferIds?.length && pastLimitItems.length === 0) { - this._persistentQueryCacheResult = value ?? undefined; - this._persistentQueryCacheResultLwt = lwt ?? undefined; - return; - } // If there are still items past the first LIMIT items, try to restore the limit buffer with them: - if (limitBufferIds?.length && pastLimitItems.length > 0) { - const lastLimitBufferIndex = pastLimitItems.findLastIndex((d) => limitBufferIds.includes(d[primaryPath] as string)); - if (lastLimitBufferIndex !== -1){ - // If the limit buffer still has room, simply restore it: - this._limitBufferResults = pastLimitItems.slice(0, Math.max(lastLimitBufferIndex + 1, this._limitBufferSize ?? 0)); - } else { - this._limitBufferResults = []; - } + if (limitBufferIds.size && pastLimitItems.length > 0) { + this._limitBufferResults = pastLimitItems; + } else { + this._limitBufferResults = []; } // get query into the correct state @@ -682,7 +646,6 @@ export class RxQueryBase< // eslint-disable-next-line no-console console.timeEnd(`Restoring persistent querycache ${this.toString()}`); - } } diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index 8854f0993ef..5e2265330ea 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1769,7 +1769,7 @@ describe('rx-query.test.ts', () => { const result2 = await query2.exec(); assert.strictEqual(query1._execOverDatabaseCount, 0); - assert.strictEqual(query2._execOverDatabaseCount, 1); + assert.strictEqual(query2._execOverDatabaseCount, 0); assert.deepStrictEqual(result2.map(item => item.passportId), ['1', '3']); collection.database.destroy(); @@ -1844,7 +1844,7 @@ describe('rx-query.test.ts', () => { // wait 1 second so that not all docs are included in lwt await new Promise((resolve) => { - setTimeout(resolve, 1000); + setTimeout(resolve, 500); }); // Cache a limited query: @@ -2016,16 +2016,17 @@ describe('rx-query.test.ts', () => { assert.deepStrictEqual(updatedResults.map(h => h.passportId), ['2', '4']); assert.strictEqual(queryAgain._limitBufferResults?.length, 1); - // But if we go further, and use the last item from the limit buffer, we'll have to re-exec: + // But if we go further, and use the last items from the limit buffer, we'll have to re-exec: uncacheRxQuery(collection._queryCache, queryAgain); await collection.find({ selector: { passportId: '4' } }).remove(); + await collection.find({ selector: { passportId: '5' } }).remove(); const queryFinal = collection.find(query.mangoQuery); queryFinal.enableLimitBuffer(3).enablePersistentQueryCache(cache); const finalResults = await queryFinal.exec(); assert.strictEqual(queryFinal._execOverDatabaseCount, 1); - assert.deepStrictEqual(finalResults.map(h => h.passportId), ['2', '5']); + assert.deepStrictEqual(finalResults.map(h => h.passportId), ['2']); collection.database.destroy(); }); @@ -2107,6 +2108,62 @@ describe('rx-query.test.ts', () => { // how they would be sorted relative to other documents assert.strictEqual(queryAgain._limitBufferResults?.length, 0); + simulateNewSession(collection); + + // If one more doc is removed from our results, we will HAVE to re-exec to ensure + // correct results, test that: + await collection.find({ selector: { passportId: '3' } }).update({ + $set: { age: 1 } + }); + + const queryFinal = collection.find(query.mangoQuery); + queryFinal.enableLimitBuffer(2).enablePersistentQueryCache(cache); + + const finalResults = await queryFinal.exec(); + + // Query re-execs, and gives correct results: + assert.strictEqual(queryFinal._execOverDatabaseCount, 1); + assert.deepStrictEqual(finalResults.map(h => h.passportId), ['4', '5']); + + // When we re-exec, the limit buffer will also get filled: + assert.deepStrictEqual(queryFinal._limitBufferResults?.map(h => h.passportId), ['6', '7']); + + collection.database.destroy(); + }); + + it('Handles case where we have fewer than LIMIT matches', async () => { + const { collection, cache } = await setUpLimitBufferSituation(); + + const query = collection.find({ limit: 3, sort: [{age: 'asc'}], selector: { age: { $lt: 45 } } }); + query.enableLimitBuffer(2).enablePersistentQueryCache(cache); + await query.exec(); + simulateNewSession(collection); + + // Remove something, still correct and no-re-exec + await collection.find({ selector: { passportId: '1' } }).remove(); + + const queryRemoved = collection.find(query.mangoQuery); + queryRemoved.enableLimitBuffer(2).enablePersistentQueryCache(cache); + const removedResults = await queryRemoved.exec(); + assert.strictEqual(queryRemoved._execOverDatabaseCount, 0); + assert.deepStrictEqual(removedResults.map(h => h.passportId), ['2']); + + simulateNewSession(collection); + + // Now add some matching docs. Since they change, they should now be in results with no re-exec. + await collection.find({ selector: { passportId: '5' } }).update({ + $set: { age: 1 } + }); + await collection.bulkUpsert([ + schemaObjects.human('6', 2), + schemaObjects.human('7', 3), + ]); + const queryAdded = collection.find(query.mangoQuery); + queryAdded.enableLimitBuffer(2).enablePersistentQueryCache(cache); + const addedResults = await queryRemoved.exec(); + assert.strictEqual(queryAdded._execOverDatabaseCount, 0); + assert.deepStrictEqual(addedResults.map(h => h.passportId), ['5', '6', '7']); + collection.database.destroy(); }); }); From 608adc0581a6a3a2365613a010f3b91e906f6b52 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Sun, 10 Dec 2023 15:19:10 -0500 Subject: [PATCH 08/11] stop re-persisting when limit results are removed --- src/rx-query.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 698bf85ff75..241e9363672 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -809,16 +809,6 @@ async function __ensureEqual(rxQuery: RxQueryBase): Promis // we got the new results, we do not have to re-execute, mustReExec stays false ret = true; // true because results changed rxQuery._setResultData(eventReduceResult.newResults as any); - - /* - * We usually want to persist the cache every time there is an update to the query to guarantee - * correctness. Cache persistence has some "cost", and we therefore try to optimize the number of - * writes. - * So, if any item in the result set was removed, we re-persist the query. - */ - if (rxQuery.mangoQuery.limit && eventReduceResult.limitResultsRemoved) { - await updatePersistentQueryCache(rxQuery); - } } } } From bcfec8b5bc45a3bebdf9f1fa88b74302fa689274 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Sun, 10 Dec 2023 15:20:31 -0500 Subject: [PATCH 09/11] remove leftover count logic we no longer use --- src/rx-query.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 241e9363672..7f6568d07fc 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -280,12 +280,6 @@ export class RxQueryBase< if (this.op === 'count') { - // if we have a persisted query cache result, use the result - if (this._persistentQueryCacheResult) { - // TODO: correct this number, but how? - return Number(this._persistentQueryCacheResult); - } - const preparedQuery = this.getPreparedQuery(); const result = await this.collection.storageInstance.count(preparedQuery); if (result.mode === 'slow' && !this.collection.database.allowSlowCount) { From efb9c9f11325172a2c5895d99c82be53587c0e3e Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Sun, 10 Dec 2023 16:17:02 -0500 Subject: [PATCH 10/11] remove a cache read+write for each query by storing limit buffer with the persistent ids --- src/rx-query.ts | 55 +++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 7f6568d07fc..a7e399c5bae 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -533,12 +533,12 @@ export class RxQueryBase< // we already restored the cache once, no need to run twice return; } - if (this.mangoQuery.skip || this.op === 'count') { console.error('The persistent query cache only works on non-skip, non-count queries.'); return; } + // First, check if there are any query results persisted: const persistentQueryId = this.persistentQueryId(); const value = await this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}`); if (!value || !Array.isArray(value) || value.length === 0) { @@ -547,25 +547,29 @@ export class RxQueryBase< return; } + // If there are persisted ids, create our two Sets of ids from the cache: + const persistedQueryCacheIds = new Set(); + const limitBufferIds = new Set(); + + for (const id of value) { + if (id.startsWith('lb-')) { + limitBufferIds.add(id.replace('lb-', '')); + } else { + persistedQueryCacheIds.add(id); + } + } + // eslint-disable-next-line no-console console.time(`Restoring persistent querycache ${this.toString()}`); - const primaryPath = this.collection.schema.primaryPath; - const results = await Promise.all([ - this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`), - this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lb`), - ]); - const lwt = results[0] as string | null; - const limitBufferIds = new Set(results[1] ?? []); - + // Next, pull the lwt from the cache: + // TODO: if lwt is too old, should we just give up here? What if there are too many changedDocs? + const lwt = (await this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`)) as string | null; if (!lwt) { return; } - this._persistentQueryCacheResult = value; - this._persistentQueryCacheResultLwt = lwt; - - const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult); + const primaryPath = this.collection.schema.primaryPath; // query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library) const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince( @@ -616,8 +620,6 @@ export class RxQueryBase< persistedQueryCacheIds.size >= this.mangoQuery.limit ); if (unchangedItemsMayNowBeInResults) { - this._persistentQueryCacheResult = undefined; - this._persistentQueryCacheResultLwt = undefined; return; } @@ -633,7 +635,7 @@ export class RxQueryBase< this._limitBufferResults = []; } - // get query into the correct state + // Finally, set the query's results to what we've pulled from disk: this._lastEnsureEqual = now(); this._latestChangeEvent = this.collection._changeEventBuffer.counter; this._setResultData(finalResults); @@ -859,26 +861,25 @@ async function updatePersistentQueryCache(rxQuery: RxQueryBase { + idsToPersist.push(`lb-${d[rxQuery.collection.schema.primaryPath]}`); + }); + } // eslint-disable-next-line no-console console.time(`Query persistence: persisting results of ${JSON.stringify(rxQuery.mangoQuery)}`); // persist query cache const lwt = rxQuery._result?.time ?? RX_META_LWT_MINIMUM; - const promises = [ - backend.setItem(`qc:${String(key)}`, value), + await Promise.all([ + backend.setItem(`qc:${String(key)}`, idsToPersist), backend.setItem(`qc:${String(key)}:lwt`, lwt.toString()), - ]; - if (rxQuery._limitBufferResults) { - const limitBufferIds = rxQuery._limitBufferResults.map((d) => d[rxQuery.collection.schema.primaryPath] as string); - promises.push(backend.setItem(`qc:${String(key)}:lb`, limitBufferIds)); - } - - await Promise.all(promises); + ]); // eslint-disable-next-line no-console console.timeEnd(`Query persistence: persisting results of ${JSON.stringify(rxQuery.mangoQuery)}`); From 17822a42243143d592b104695105719a8aedb304 Mon Sep 17 00:00:00 2001 From: Tristan Homsi Date: Sun, 10 Dec 2023 16:23:17 -0500 Subject: [PATCH 11/11] fix the cache size checks in tests now that we're down to 2 keys --- test/unit/rx-query.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index 5e2265330ea..9893190c754 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1604,7 +1604,7 @@ describe('rx-query.test.ts', () => { await collection.bulkInsert([human1, human2]); await query.exec(); - assert.strictEqual(cache.size, 3); + assert.strictEqual(cache.size, 2); collection.database.destroy(); }); @@ -1900,7 +1900,7 @@ describe('rx-query.test.ts', () => { const originalResults = await query.exec(); assert.deepStrictEqual(originalResults.map(h => h.passportId), ['1', '2']); assert.strictEqual(query._limitBufferResults?.length, 3); - assert.strictEqual(cache.size, 3); + assert.strictEqual(cache.size, 2); // remove one of the items from the query results await collection.find({ selector: { passportId: '1' } }).update({ @@ -1937,7 +1937,7 @@ describe('rx-query.test.ts', () => { // uncache the query first, before changes are made simulateNewSession(collection); - assert.strictEqual(cache.size, 3); + assert.strictEqual(cache.size, 2); // remove one of the items from the query results while query is not listening in memory await collection.find({ selector: { passportId: '1' } }).update({