Skip to content

Commit

Permalink
Add delta queue to operation repo
Browse files Browse the repository at this point in the history
Added a delta queue to the operation repo to flush all executor deltas at the same time. The operation repo's delta queue enqueues all deltas and then will sort and pass the delta to its correct executor. Previously, each executor was in charge of enqueuing and flushing their own delta queue. This change also aligns the Web SDK more with the iOS SDK's operation repo implementation.
  • Loading branch information
shepherd-l committed Jun 4, 2024
1 parent e652544 commit 69eaacb
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions src/core/operationRepo/OperationRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { logMethodCall } from '../../shared/utils/utils';
export class OperationRepo {
public executorStore: ExecutorStore;
private _unsubscribeFromModelRepo: () => void;
private _deltaQueue: CoreDelta<SupportedModel>[] = [];
static DELTAS_BATCH_PROCESSING_TIME = 5;

constructor(private modelRepo: ModelRepo) {
this.executorStore = new ExecutorStore();
Expand All @@ -16,6 +18,12 @@ export class OperationRepo {
this._processDelta(delta);
},
);

setInterval(() => {
if (this._deltaQueue.length > 0) {
this._processDeltaQueue();
}
}, OperationRepo.DELTAS_BATCH_PROCESSING_TIME * 1_000);
}

setModelRepoAndResubscribe(modelRepo: ModelRepo) {
Expand All @@ -33,9 +41,35 @@ export class OperationRepo {
this.executorStore.forceDeltaQueueProcessingOnAllExecutors();
}

private _flushDeltas(): void {
logMethodCall('OperationRepo._flushDeltas');
this._deltaQueue = [];
}

private _processDelta(delta: CoreDelta<SupportedModel>): void {
logMethodCall('processDelta', { delta });
const { modelName } = delta.model;
this.executorStore.store[modelName]?.enqueueDelta(delta);
// Todo: Rename to _enqueDelta - also change name in spy tests
logMethodCall('OperationRepo._processDelta', { delta });
const deltaCopy = JSON.parse(JSON.stringify(delta));
this._deltaQueue.push(deltaCopy);
}

private _processDeltaQueue(): void {
logMethodCall('OperationRepo._processDeltaQueue');

this._deltaQueue.forEach((delta) => {
const { modelName } = delta.model;

Log.debug('OperationRepo._processDeltaQueue: model Name: ' + modelName);

this.executorStore.store[modelName]?.enqueueDelta(delta);
});

// for each executor
// TODO: fires SubscriptionExecutor.processDeltaQueue and SubscriptionExecutor._flushDeltas 3 times
// ExecutorStore has 3 ModelName for Subscriptions: smsSubscription, emailSubscription, pushSubscription
this.forceDeltaQueueProcessingOnAllExecutors();
// executors flush is in the above method

this._flushDeltas();
}
}

0 comments on commit 69eaacb

Please sign in to comment.