From 69eaacb1bffefc15461c04267676f2aa22ba3566 Mon Sep 17 00:00:00 2001 From: Shepherd Date: Tue, 4 Jun 2024 12:47:23 -0400 Subject: [PATCH] Add delta queue to operation repo Added a delta queue to the operation repo to flush all executor deltas at the same time. The operation repo's delta queue enqueues all deltas and then will sort and pass the delta to its correct executor. Previously, each executor was in charge of enqueuing and flushing their own delta queue. This change also aligns the Web SDK more with the iOS SDK's operation repo implementation. --- src/core/operationRepo/OperationRepo.ts | 40 +++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/src/core/operationRepo/OperationRepo.ts b/src/core/operationRepo/OperationRepo.ts index 2fdf8a120..1b78b4186 100644 --- a/src/core/operationRepo/OperationRepo.ts +++ b/src/core/operationRepo/OperationRepo.ts @@ -7,6 +7,8 @@ import { logMethodCall } from '../../shared/utils/utils'; export class OperationRepo { public executorStore: ExecutorStore; private _unsubscribeFromModelRepo: () => void; + private _deltaQueue: CoreDelta[] = []; + static DELTAS_BATCH_PROCESSING_TIME = 5; constructor(private modelRepo: ModelRepo) { this.executorStore = new ExecutorStore(); @@ -16,6 +18,12 @@ export class OperationRepo { this._processDelta(delta); }, ); + + setInterval(() => { + if (this._deltaQueue.length > 0) { + this._processDeltaQueue(); + } + }, OperationRepo.DELTAS_BATCH_PROCESSING_TIME * 1_000); } setModelRepoAndResubscribe(modelRepo: ModelRepo) { @@ -33,9 +41,35 @@ export class OperationRepo { this.executorStore.forceDeltaQueueProcessingOnAllExecutors(); } + private _flushDeltas(): void { + logMethodCall('OperationRepo._flushDeltas'); + this._deltaQueue = []; + } + private _processDelta(delta: CoreDelta): void { - logMethodCall('processDelta', { delta }); - const { modelName } = delta.model; - this.executorStore.store[modelName]?.enqueueDelta(delta); + // Todo: Rename to _enqueDelta - also change name in spy tests + logMethodCall('OperationRepo._processDelta', { delta }); + const deltaCopy = JSON.parse(JSON.stringify(delta)); + this._deltaQueue.push(deltaCopy); + } + + private _processDeltaQueue(): void { + logMethodCall('OperationRepo._processDeltaQueue'); + + this._deltaQueue.forEach((delta) => { + const { modelName } = delta.model; + + Log.debug('OperationRepo._processDeltaQueue: model Name: ' + modelName); + + this.executorStore.store[modelName]?.enqueueDelta(delta); + }); + + // for each executor + // TODO: fires SubscriptionExecutor.processDeltaQueue and SubscriptionExecutor._flushDeltas 3 times + // ExecutorStore has 3 ModelName for Subscriptions: smsSubscription, emailSubscription, pushSubscription + this.forceDeltaQueueProcessingOnAllExecutors(); + // executors flush is in the above method + + this._flushDeltas(); } }