From 3e7abc907fba844ba56649f436efd2db93befcae Mon Sep 17 00:00:00 2001 From: Guy Carmeli Date: Fri, 30 Jun 2023 18:15:42 +0300 Subject: [PATCH] MediatorObservable always subscribes to sources (#104) Until now, the MediatorObservable only subscribed to sources when it was subscribes to. It also unsubscribed from all sources when it was unsubscribed from. This made MediatorObservable very unpredictable. --- src/observable/MediatorObservable.ts | 52 ++--------------------- src/observable/ObservableMediator.test.ts | 21 ++++++--- 2 files changed, 17 insertions(+), 56 deletions(-) diff --git a/src/observable/MediatorObservable.ts b/src/observable/MediatorObservable.ts index 07d3ad46..7c73faca 100644 --- a/src/observable/MediatorObservable.ts +++ b/src/observable/MediatorObservable.ts @@ -1,55 +1,9 @@ import { Observable } from './Observable'; -import { OnNext, Unsubscribe, Observable as IObservable } from './types'; - -type Source = { - source: Observable; - onNext: OnNext; - unsubscribe?: Unsubscribe; -}; - -export class MediatorObservable implements IObservable { - private subscribers: Set> = new Set(); - private currentValue!: T; - private sources: Source[] = []; - - constructor(initialValue?: T) { - this.currentValue = initialValue as T; - } +import { OnNext } from './types'; +export class MediatorObservable extends Observable { addSource(source: Observable, onNext: OnNext): MediatorObservable { - this.sources.push({ source, onNext }); + source.subscribe(onNext); return this; } - - public get value(): T { - return this.currentValue; - } - - public set value(value: T) { - this.currentValue = value; - this.subscribers.forEach((subscriber) => subscriber(value)); - } - - subscribe(onNext: OnNext): Unsubscribe { - if (this.subscribers.has(onNext)) { - throw new Error('Subscriber already subscribed'); - } - this.subscribers.add(onNext); - - this.subscribeToAllSources(); - - return () => { - this.subscribers.delete(onNext); - this.sources.forEach(({ unsubscribe }) => unsubscribe?.()); - }; - } - - private subscribeToAllSources() { - this.sources.forEach(({ source, onNext }, index) => { - const unsubscribe = source.subscribe((value) => { - onNext(value); - }); - this.sources[index].unsubscribe = unsubscribe; - }); - } } diff --git a/src/observable/ObservableMediator.test.ts b/src/observable/ObservableMediator.test.ts index 5cc6a505..26bf2b18 100644 --- a/src/observable/ObservableMediator.test.ts +++ b/src/observable/ObservableMediator.test.ts @@ -73,17 +73,14 @@ describe('ObservableMediator', () => { expect(uut.value).toEqual(-2); }); - it('should unsubscribe from all sources when unsubscribed', () => { + it('should continue observing sources even if there are no subscribers', () => { const a = new Observable(); - const b = new Observable(); - - uut.addSource(a, NOOP); - uut.addSource(b, NOOP); + uut.addSource(a, (nextA) => { uut.value = nextA; }); const unsubscribe = uut.subscribe(NOOP); unsubscribe(); - expect(Reflect.get(a, 'subscribers').size).toEqual(0); - expect(Reflect.get(b, 'subscribers').size).toEqual(0); + a.value = 10; + expect(uut.value).toEqual(10); }); it('should throw an error if a subscriber is already subscribed', () => { @@ -123,4 +120,14 @@ describe('ObservableMediator', () => { const mediator = new MediatorObservable(1); expect(mediator.value).toEqual(1); }); + + it('should subscribe to sources immediately', () => { + const a = new Observable(1); + uut.addSource(a, (nextA) => { + uut.value = nextA * 2; + }); + + a.value = 3; + expect(uut.value).toEqual(6); + }); });