Skip to content

Commit

Permalink
MediatorObservable always subscribes to sources (#104)
Browse files Browse the repository at this point in the history
Until now, the MediatorObservable only subscribed to sources when it was subscribes to. It also unsubscribed from all sources when it was unsubscribed from. This made MediatorObservable very unpredictable.
  • Loading branch information
guyca committed Jun 30, 2023
1 parent 1d4fe93 commit 3e7abc9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 56 deletions.
52 changes: 3 additions & 49 deletions src/observable/MediatorObservable.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,9 @@
import { Observable } from './Observable';
import { OnNext, Unsubscribe, Observable as IObservable } from './types';

type Source<T> = {
source: Observable<T>;
onNext: OnNext<T>;
unsubscribe?: Unsubscribe;
};

export class MediatorObservable<T> implements IObservable<T> {
private subscribers: Set<OnNext<T>> = new Set();
private currentValue!: T;
private sources: Source<any>[] = [];

constructor(initialValue?: T) {
this.currentValue = initialValue as T;
}
import { OnNext } from './types';

export class MediatorObservable<T> extends Observable<T> {
addSource<S>(source: Observable<S>, onNext: OnNext<S>): MediatorObservable<T> {
this.sources.push({ source, onNext });
source.subscribe(onNext);
return this;
}

public get value(): T {
return this.currentValue;
}

public set value(value: T) {
this.currentValue = value;
this.subscribers.forEach((subscriber) => subscriber(value));
}

subscribe(onNext: OnNext<T>): Unsubscribe {
if (this.subscribers.has(onNext)) {
throw new Error('Subscriber already subscribed');
}
this.subscribers.add(onNext);

this.subscribeToAllSources();

return () => {
this.subscribers.delete(onNext);
this.sources.forEach(({ unsubscribe }) => unsubscribe?.());
};
}

private subscribeToAllSources() {
this.sources.forEach(({ source, onNext }, index) => {
const unsubscribe = source.subscribe((value) => {
onNext(value);
});
this.sources[index].unsubscribe = unsubscribe;
});
}
}
21 changes: 14 additions & 7 deletions src/observable/ObservableMediator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,14 @@ describe('ObservableMediator', () => {
expect(uut.value).toEqual(-2);
});

it('should unsubscribe from all sources when unsubscribed', () => {
it('should continue observing sources even if there are no subscribers', () => {
const a = new Observable<number>();
const b = new Observable<number>();

uut.addSource(a, NOOP);
uut.addSource(b, NOOP);
uut.addSource(a, (nextA) => { uut.value = nextA; });

const unsubscribe = uut.subscribe(NOOP);
unsubscribe();
expect(Reflect.get(a, 'subscribers').size).toEqual(0);
expect(Reflect.get(b, 'subscribers').size).toEqual(0);
a.value = 10;
expect(uut.value).toEqual(10);
});

it('should throw an error if a subscriber is already subscribed', () => {
Expand Down Expand Up @@ -123,4 +120,14 @@ describe('ObservableMediator', () => {
const mediator = new MediatorObservable(1);
expect(mediator.value).toEqual(1);
});

it('should subscribe to sources immediately', () => {
const a = new Observable(1);
uut.addSource(a, (nextA) => {
uut.value = nextA * 2;
});

a.value = 3;
expect(uut.value).toEqual(6);
});
});

0 comments on commit 3e7abc9

Please sign in to comment.