From 8aeb97180041a10339dd4320966fbc0bfd7def47 Mon Sep 17 00:00:00 2001 From: Hannes Moser Date: Fri, 24 Nov 2023 23:47:44 +0100 Subject: [PATCH] Add query cache persistence improve limitResultsRemoved check to be as minimal as possible Use the default hash function from the database instance This guarantees a working hash function on every platform (web, mobile). --- src/event-reduce.ts | 19 ++- src/rx-query.ts | 278 +++++++++++++++++++++++++++++++------ test/helper/cache.ts | 31 +++++ test/unit/rx-query.test.ts | 252 +++++++++++++++++++++++++++++++++ 4 files changed, 539 insertions(+), 41 deletions(-) create mode 100644 test/helper/cache.ts diff --git a/src/event-reduce.ts b/src/event-reduce.ts index 6761264f6ff..8efb6ff2ffc 100644 --- a/src/event-reduce.ts +++ b/src/event-reduce.ts @@ -43,6 +43,7 @@ export type EventReduceResultPos = { runFullQueryAgain: false; changed: boolean; newResults: RxDocumentType[]; + limitResultsRemoved: boolean; }; export type EventReduceResult = EventReduceResultNeg | EventReduceResultPos; @@ -156,6 +157,17 @@ function canFillResultSetFromLimitBuffer(s: StateResolveFunction ); } + +function actionRemovesItemFromResults(action: ActionName): boolean { + return [ + 'removeFirstItem', + 'removeLastItem', + 'removeExisting', + 'runFullQueryAgain', + ].includes(action); +} + + export function calculateNewResults( rxQuery: RxQuery, rxChangeEvents: RxChangeEvent[] @@ -169,6 +181,7 @@ export function calculateNewResults( const previousResults: RxDocumentType[] = ensureNotFalsy(rxQuery._result).docsData.slice(0); const previousResultsMap: Map = ensureNotFalsy(rxQuery._result).docsDataMap; let changed: boolean = false; + let limitResultsRemoved: boolean = false; const eventReduceEvents: ChangeEvent[] = rxChangeEvents .map(cE => rxChangeEventToEventReduceChangeEvent(cE)) @@ -217,6 +230,9 @@ export function calculateNewResults( previousResults, previousResultsMap ); + if (actionRemovesItemFromResults(actionName)) { + limitResultsRemoved = true; + } return false; } }); @@ -228,7 +244,8 @@ export function calculateNewResults( return { runFullQueryAgain: false, changed, - newResults: previousResults + newResults: previousResults, + limitResultsRemoved, }; } } diff --git a/src/rx-query.ts b/src/rx-query.ts index 0dc319b4137..d7f0c7b10ce 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -13,15 +13,15 @@ import { shareReplay } from 'rxjs/operators'; import { - sortObject, - stringifyFilter, - pluginMissing, - overwriteGetterForCaching, + areRxDocumentArraysEqual, + ensureNotFalsy, now, - PROMISE_RESOLVE_FALSE, + overwriteGetterForCaching, + pluginMissing, + PROMISE_RESOLVE_FALSE, RX_META_LWT_MINIMUM, RXJS_SHARE_REPLAY_DEFAULTS, - ensureNotFalsy, - areRxDocumentArraysEqual + sortObject, + stringifyFilter } from './plugins/utils'; import { newRxError @@ -30,28 +30,36 @@ import { runPluginHooks } from './hooks'; import type { - RxCollection, - RxDocument, - RxQueryOP, - RxQuery, MangoQuery, - MangoQuerySortPart, MangoQuerySelector, + MangoQuerySortPart, PreparedQuery, + QueryMatcher, RxChangeEvent, - RxDocumentWriteData, + RxCollection, + RxDocument, RxDocumentData, - QueryMatcher + RxDocumentWriteData, + RxQuery, + RxQueryOP } from './types'; import { calculateNewResults } from './event-reduce'; import { triggerCacheReplacement } from './query-cache'; -import { getQueryMatcher, normalizeMangoQuery } from './rx-query-helper'; +import { getQueryMatcher, getSortComparator, normalizeMangoQuery } from './rx-query-helper'; + +export interface QueryCacheBackend { + getItem(key: string): Promise; + setItem(key: string, value: T): Promise; +} let _queryCount = 0; const newQueryID = function (): number { return ++_queryCount; }; +// allow changes to be 100ms older than the actual lwt value +const UPDATE_DRIFT = 100; + export class RxQueryBase< RxDocType, // TODO also pass DocMethods here @@ -85,6 +93,7 @@ export class RxQueryBase< docsData: RxDocumentData[]; // A key->document map, used in the event reduce optimization. docsDataMap: Map; + docsKeys: string[]; docsMap: Map>; docs: RxDocument[]; count: number; @@ -190,6 +199,12 @@ export class RxQueryBase< public _limitBufferSize: number | null = null; public _limitBufferResults: RxDocumentData[] | null = null; + // Fields used for the persistent query cache when enabled: + public _persistentQueryCacheResult?: string[] | string = undefined; + public _persistentQueryCacheResultLwt?: string = undefined; // lwt = latest write time + public _persistentQueryCacheLoaded?: Promise; + public _persistentQueryCacheBackend?: QueryCacheBackend; + /** * ensures that the exec-runs * are not run in parallel @@ -213,6 +228,7 @@ export class RxQueryBase< if (typeof newResultData === 'number') { this._result = { docsData: [], + docsKeys: [], docsMap: new Map(), docsDataMap: new Map(), count: newResultData, @@ -235,14 +251,17 @@ export class RxQueryBase< * we directly use the objects that are stored in the RxDocument * to ensure we do not store the same data twice and fill up the memory. */ + const docsKeys: string[] = []; const docsData = docs.map(doc => { docsDataMap.set(doc.primary, doc._data); docsMap.set(doc.primary, doc); + docsKeys.push(doc.primary); return doc._data; }); this._result = { docsData, + docsKeys, docsMap, docsDataMap, count: docsData.length, @@ -261,6 +280,12 @@ export class RxQueryBase< if (this.op === 'count') { + // if we have a persisted query cache result, use the result + if (this._persistentQueryCacheResult) { + // TODO: correct this number, but how? + return Number(this._persistentQueryCacheResult); + } + const preparedQuery = this.getPreparedQuery(); const result = await this.collection.storageInstance.count(preparedQuery); if (result.mode === 'slow' && !this.collection.database.allowSlowCount) { @@ -382,6 +407,10 @@ export class RxQueryBase< return value; } + persistentQueryId() { + return String(this.collection.database.hashFunction(this.toString())); + } + /** * returns the prepared query * which can be send to the storage instance to query for documents. @@ -490,6 +519,120 @@ export class RxQueryBase< this._limitBufferSize = bufferSize; return this; } + + enablePersistentQueryCache(backend: QueryCacheBackend) { + this._persistentQueryCacheBackend = backend; + this._persistentQueryCacheLoaded = this._restoreQueryCacheFromPersistedState(); + return this; + } + + private async _restoreQueryCacheFromPersistedState() { + if (!this._persistentQueryCacheBackend) { + // no cache backend provided, do nothing + return; + } + + if (this._persistentQueryCacheResult) { + // we already restored the cache once, no need to run twice + return; + } + + if (this.mangoQuery.skip) { + console.error('The persistent query cache only works on non-skip queries.'); + return; + } + + const persistentQueryId = this.persistentQueryId(); + const value = await this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}`); + if (!value) { + return; + } + + const lwt = (await this._persistentQueryCacheBackend.getItem(`qc:${persistentQueryId}:lwt`)) as string | null; + const primaryPath = this.collection.schema.primaryPath; + + this._persistentQueryCacheResult = value ?? undefined; + this._persistentQueryCacheResultLwt = lwt ?? undefined; + + // if this is a regular query, also load documents into cache + if (Array.isArray(value) && value.length > 0) { + const persistedQueryCacheIds = new Set(this._persistentQueryCacheResult); + + let docsData: RxDocumentData[] = []; + + // query all docs updated > last persisted, limit to an arbitrary 1_000_000 (10x of what we consider our largest library) + const {documents: changedDocs} = await this.collection.storageInstance.getChangedDocumentsSince( + 1_000_000, + // make sure we remove the monotonic clock (xxx.01, xxx.02) from the lwt timestamp to avoid issues with + // lookups in indices (dexie) + {id: '', lwt: Math.floor(Number(lwt)) - UPDATE_DRIFT} + ); + + for (const changedDoc of changedDocs) { + /* + * no need to fetch again, we already got the doc from the list of changed docs, and therefore we filter + * deleted docs as well + */ + persistedQueryCacheIds.delete(changedDoc[primaryPath] as string); + + // ignore deleted docs or docs that do not match the query + if (!this.doesDocumentDataMatch(changedDoc)) { + continue; + } + + // add to document cache + this.collection._docCache.getCachedRxDocument(changedDoc); + + // add to docs + docsData.push(changedDoc); + } + + // fetch remaining persisted doc ids + const nonRestoredDocIds: string[] = []; + for (const docId of persistedQueryCacheIds) { + // first try to fill from docCache + const docData = this.collection._docCache.getLatestDocumentDataIfExists(docId); + if (docData && this.doesDocumentDataMatch(docData)) { + docsData.push(docData); + } + + if (!docData) { + nonRestoredDocIds.push(docId); + } + } + + // otherwise get from storage + if (nonRestoredDocIds.length > 0) { + const docsMap = await this.collection.storageInstance.findDocumentsById(nonRestoredDocIds, false); + Object.values(docsMap).forEach(docData => { + this.collection._docCache.getCachedRxDocument(docData); + docsData.push(docData); + }); + } + + const normalizedMangoQuery = normalizeMangoQuery( + this.collection.schema.jsonSchema, + this.mangoQuery + ); + const sortComparator = getSortComparator(this.collection.schema.jsonSchema, normalizedMangoQuery); + const skip = normalizedMangoQuery.skip ? normalizedMangoQuery.skip : 0; + const limit = normalizedMangoQuery.limit ? normalizedMangoQuery.limit : Infinity; + const skipPlusLimit = skip + limit; + docsData = docsData.sort(sortComparator); + docsData = docsData.slice(skip, skipPlusLimit); + + + // get query into the correct state + this._lastEnsureEqual = now(); + this._latestChangeEvent = this.collection._changeEventBuffer.counter; + this._setResultData(docsData); + } else if (value && Number.isInteger(Number(value))) { + // get query into the correct state + this._lastEnsureEqual = now(); + this._latestChangeEvent = this.collection._changeEventBuffer.counter; + this._setResultData(Number(value)); + } + } } export function _getDefaultQuery(): MangoQuery { @@ -524,6 +667,7 @@ export function createRxQuery( // ensure when created with same params, only one is created ret = tunnelQueryCache(ret); + // TODO: clear persistent query cache as well triggerCacheReplacement(collection); return ret; @@ -563,11 +707,14 @@ function _ensureEqual(rxQuery: RxQueryBase): Promise { return rxQuery._ensureEqualQueue; } + /** * ensures that the results of this query is equal to the results which a query over the database would give * @return true if results have changed */ -function __ensureEqual(rxQuery: RxQueryBase): Promise { +async function __ensureEqual(rxQuery: RxQueryBase): Promise { + await rxQuery._persistentQueryCacheLoaded; + rxQuery._lastEnsureEqual = now(); /** @@ -634,6 +781,7 @@ function __ensureEqual(rxQuery: RxQueryBase): Promise(rxQuery: RxQueryBase): Promise(rxQuery: RxQueryBase): Promise { + await updatePersistentQueryCache(rxQuery); + return returnValue; }); } - return Promise.resolve(ret); // true if results have changed + + return ret; // true if results have changed +} + + +async function updatePersistentQueryCache(rxQuery: RxQueryBase) { + if (!rxQuery._persistentQueryCacheBackend) { + return; + } + + const backend = rxQuery._persistentQueryCacheBackend; + + const isCount = rxQuery._result?.docs.length === 0 && rxQuery._result.count > 0; + + const key = rxQuery.persistentQueryId(); + const value = isCount + ? rxQuery._result?.count?.toString() ?? '0' + : rxQuery._result?.docsKeys ?? []; + + // update _persistedQueryCacheResult + rxQuery._persistentQueryCacheResult = value; + + // eslint-disable-next-line no-console + console.time(`Query persistence: persisting results of ${JSON.stringify(rxQuery.mangoQuery)}`); + // persist query cache + const lwt = rxQuery._result?.time ?? RX_META_LWT_MINIMUM; + await backend.setItem(`qc:${String(key)}`, value); + await backend.setItem(`qc:${String(key)}:lwt`, lwt.toString()); + + // eslint-disable-next-line no-console + console.timeEnd(`Query persistence: persisting results of ${JSON.stringify(rxQuery.mangoQuery)}`); +} + + +// Refactored out of `queryCollection`: modifies the docResults array to fill it with data +async function _queryCollectionByIds(rxQuery: RxQuery | RxQueryBase, docResults: RxDocumentData[], docIds: string[]) { + const collection = rxQuery.collection; + docIds = docIds.filter(docId => { + // first try to fill from docCache + const docData = rxQuery.collection._docCache.getLatestDocumentDataIfExists(docId); + if (docData) { + if (!docData._deleted) { + docResults.push(docData); + } + return false; + } else { + return true; + } + }); + + // otherwise get from storage + if (docIds.length > 0) { + const docsMap = await collection.storageInstance.findDocumentsById(docIds, false); + Object.values(docsMap).forEach(docData => { + docResults.push(docData); + }); + } } /** @@ -700,6 +917,8 @@ function __ensureEqual(rxQuery: RxQueryBase): Promise( rxQuery: RxQuery | RxQueryBase ): Promise[]> { + await rxQuery._persistentQueryCacheLoaded; + let docs: RxDocumentData[] = []; const collection = rxQuery.collection; @@ -711,26 +930,7 @@ export async function queryCollection( */ if (rxQuery.isFindOneByIdQuery) { if (Array.isArray(rxQuery.isFindOneByIdQuery)) { - let docIds = rxQuery.isFindOneByIdQuery; - docIds = docIds.filter(docId => { - // first try to fill from docCache - const docData = rxQuery.collection._docCache.getLatestDocumentDataIfExists(docId); - if (docData) { - if (!docData._deleted) { - docs.push(docData); - } - return false; - } else { - return true; - } - }); - // otherwise get from storage - if (docIds.length > 0) { - const docsMap = await collection.storageInstance.findDocumentsById(docIds, false); - Object.values(docsMap).forEach(docData => { - docs.push(docData); - }); - } + await _queryCollectionByIds(rxQuery, docs, rxQuery.isFindOneByIdQuery); } else { const docId = rxQuery.isFindOneByIdQuery; @@ -758,7 +958,6 @@ export async function queryCollection( docs = queryResult.documents; } return docs; - } /** @@ -804,7 +1003,6 @@ export function isFindOneByIdQuery( } - export function isRxQuery(obj: any): boolean { return obj instanceof RxQueryBase; } diff --git a/test/helper/cache.ts b/test/helper/cache.ts new file mode 100644 index 00000000000..2be9accf43e --- /dev/null +++ b/test/helper/cache.ts @@ -0,0 +1,31 @@ +import {QueryCacheBackend, RxCollection} from '../../src'; + +export class Cache implements QueryCacheBackend { + private readonly items; + + constructor() { + this.items = new Map(); + } + + getItem(key: string) { + return this.items.get(key); + } + + async setItem(key: string, value: T) { + this.items.set(key, value); + return await Promise.resolve(value); + } + + get size() { + return this.items.size; + } + + getItems() { + return this.items; + } +} + +export function clearQueryCache(collection: RxCollection) { + const queryCache = collection._queryCache; + queryCache._map = new Map(); +} diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index a15de7e9060..83bf6a2f1d7 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -6,6 +6,7 @@ import clone from 'clone'; import * as humansCollection from './../helper/humans-collection'; import * as schemaObjects from '../helper/schema-objects'; import * as schemas from './../helper/schemas'; +import {Cache, clearQueryCache} from '../helper/cache'; import { isRxQuery, @@ -14,6 +15,7 @@ import { promiseWait, randomCouchString, ensureNotFalsy, + now, } from '../../'; import { firstValueFrom } from 'rxjs'; @@ -1582,4 +1584,254 @@ describe('rx-query.test.ts', () => { collection.database.destroy(); }); }); + + async function setUpPersistentQueryCacheCollection() { + const collection = await humansCollection.create(0); + return {collection}; + } + + config.parallel('Persistent Query Cache', () => { + it('query fills cache', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const query = collection.find({ limit: 1 }); + const cache = new Cache(); + query.enablePersistentQueryCache(cache); + + const human1 = schemaObjects.human(); + const human2 = schemaObjects.human(); + + await collection.bulkInsert([human1, human2]); + await query.exec(); + + assert.strictEqual(cache.size, 2); + + collection.database.destroy(); + }); + + it('does not query from database after restoring from persistent query cache', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const human1 = schemaObjects.human(); + const human2 = schemaObjects.human(); + + await collection.bulkInsert([human1, human2]); + + const query = collection.find({ limit: 2 }); + + // fill cache + const queryId = query.persistentQueryId(); + const cache = new Cache(); + await cache.setItem(`qc:${queryId}`, [human1.passportId, human2.passportId]); + await cache.setItem(`qc:${queryId}:lwt`, `${now()}`); + query.enablePersistentQueryCache(cache); + + // execute query + const result = await query.exec(); + + assert.strictEqual(result.length, 2); + assert.strictEqual(query._execOverDatabaseCount, 0); + + collection.database.destroy(); + }); + + it('does not query from database after modifying a document', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const human1 = schemaObjects.human(); + const human1Age = human1.age; + + await collection.bulkInsert([human1]); + + const query1 = collection.find({ selector: { age: human1Age }}); + + // fill cache + const queryId = query1.persistentQueryId(); + const cache = new Cache(); + await cache.setItem(`qc:${queryId}`, [human1.passportId]); + await cache.setItem(`qc:${queryId}:lwt`, `${now()}`); + query1.enablePersistentQueryCache(cache); + + // execute query + const result1 = await query1.exec(); + assert.strictEqual(result1.length, 1); + + const human1Doc = result1[0]; + await human1Doc.modify(data => { + data.age += 1; + return data; + }); + + clearQueryCache(collection); + + const query2 = collection.find({ selector: { age: human1Age }}); + query2.enablePersistentQueryCache(cache); + + const result2 = await query2.exec(); + + assert.strictEqual(result1.length, 1); + assert.strictEqual(result2.length, 0); + assert.strictEqual(query1._execOverDatabaseCount, 0); + assert.strictEqual(query2._execOverDatabaseCount, 0); + + collection.database.destroy(); + }); + + it('does not query from database after adding an object', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const human1 = schemaObjects.human(); + const human2 = schemaObjects.human(); + const human3 = schemaObjects.human(); + + await collection.bulkInsert([human1, human2]); + + const query = collection.find({ limit: 3 }); + const queryId = query.persistentQueryId(); + const cache = new Cache(); + await cache.setItem(`qc:${queryId}`, [human1.passportId, human2.passportId]); + await cache.setItem(`qc:${queryId}:lwt`, `${now()}`); + query.enablePersistentQueryCache(cache); + + const result1 = await query.exec(); + + await collection.insert(human3); + + const result2 = await query.exec(); + + assert.strictEqual(result1.length, 2); + assert.strictEqual(result2.length, 3); + assert.strictEqual(query._execOverDatabaseCount, 0); + + collection.database.destroy(); + }); + + it('does return docs from cache in correct order and with limits applied', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const human1 = schemaObjects.human('1', 30); + const human2 = schemaObjects.human('2', 40); + const human3 = schemaObjects.human('3', 50); + + await collection.bulkInsert([human2, human3]); + + const query1 = collection.find({ limit: 2, sort: [{age: 'asc'}] }); + const queryId = query1.persistentQueryId(); + const lwt = now(); + + const cache = new Cache(); + await cache.setItem(`qc:${queryId}`, [human2.passportId, human3.passportId]); + await cache.setItem(`qc:${queryId}:lwt`, `${lwt}`); + + await collection.insert(human1); + + clearQueryCache(collection); + + const query2 = collection.find({ limit: 2, sort: [{age: 'asc'}] }); + query2.enablePersistentQueryCache(cache); + + const result2 = await query2.exec(); + + assert.strictEqual(query1._execOverDatabaseCount, 0); + assert.strictEqual(query2._execOverDatabaseCount, 0); + assert.deepStrictEqual(result2.map(item => item.passportId), ['1', '2']); + + collection.database.destroy(); + }); + + it('removing an item from the database, but not from cache does not lead to wrong results after restoring', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const human1 = schemaObjects.human('1', 30); + const human2 = schemaObjects.human('2', 40); + const human3 = schemaObjects.human('3', 50); + + await collection.bulkInsert([human1, human2, human3]); + + const query1 = collection.find({ limit: 2, sort: [{age: 'asc'}] }); + const queryId = query1.persistentQueryId(); + const lwt = now(); + + const cache = new Cache(); + await cache.setItem(`qc:${queryId}`, [human1.passportId, human2.passportId, human3.passportId]); + await cache.setItem(`qc:${queryId}:lwt`, `${lwt}`); + + const removeQuery = collection.find({ selector: { passportId: '2' }}); + await removeQuery.remove(); + + clearQueryCache(collection); + + const query2 = collection.find({ limit: 2, sort: [{age: 'asc'}] }); + query2.enablePersistentQueryCache(cache); + + assert.strictEqual(cache.getItem(`qc:${queryId}`).length, 3); + + const result2 = await query2.exec(); + + assert.strictEqual(query1._execOverDatabaseCount, 0); + assert.strictEqual(query2._execOverDatabaseCount, 0); + assert.deepStrictEqual(result2.map(item => item.passportId), ['1', '3']); + + collection.database.destroy(); + }); + + it('old cache values are updated when documents are modified', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const human1 = schemaObjects.human('1', 30); + + await collection.bulkInsert([human1]); + + // fill cache + const cache = new Cache(); + const query1 = collection.find({}); + query1.enablePersistentQueryCache(cache); + const queryId = query1.persistentQueryId(); + + const result1 = await query1.exec(); + assert.strictEqual(result1.length, 1); + assert.strictEqual(cache.size, 2); + + clearQueryCache(collection); + + // go back in time + const lwt = now() - 7200 * 1000; // go back in time (2hrs) + await cache.setItem(`qc:${queryId}:lwt`, `${lwt}`); + + const query2 = collection.find({}); + query2.enablePersistentQueryCache(cache); + await query2._persistentQueryCacheLoaded; + + await result1[0].modify(data => { + data.age = 40; + return data; + }); + + await query2.exec(); + + const currLwt = Number(await cache.getItem(`qc:${queryId}:lwt`)); + assert.strictEqual(currLwt > lwt, true); + + collection.database.destroy(); + }); + + it('query from database when cache is empty', async () => { + const {collection} = await setUpPersistentQueryCacheCollection(); + + const human1 = schemaObjects.human(); + await collection.bulkInsert([human1]); + + const query = collection.find({ limit: 3 }); + + const cache = new Cache(); + query.enablePersistentQueryCache(cache); + + const result = await query.exec(); + + assert.strictEqual(result.length, 1); + assert.strictEqual(query._execOverDatabaseCount, 1); + + collection.database.destroy(); + }); + }); });