Skip to content

Commit

Permalink
ObservableAsPropertyHelper: Rewrite to avoid allocations on property …
Browse files Browse the repository at this point in the history
…update
  • Loading branch information
oysteinkrog committed Oct 14, 2019
1 parent 60017a2 commit 33c7825
Showing 1 changed file with 35 additions and 15 deletions.
50 changes: 35 additions & 15 deletions src/ReactiveUI/ObservableForProperty/ObservableAsPropertyHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ public sealed class ObservableAsPropertyHelper<T> : IHandleObservableErrors, IDi
{
private readonly Lazy<ISubject<Exception>> _thrownExceptions;
private readonly IObservable<T> _source;
private readonly ISubject<T> _subject;
private readonly IReactiveObject _reactiveObject;
private readonly string _propertyName;
private T _lastValue;
private CompositeDisposable _disposable = new CompositeDisposable();
private int _activated;
private readonly Action<T> _onNext;
private readonly Action<Exception> _onError;
private readonly IScheduler _scheduler;

/// <summary>
/// Initializes a new instance of the <see cref="ObservableAsPropertyHelper{T}"/> class.
Expand Down Expand Up @@ -67,30 +71,39 @@ public ObservableAsPropertyHelper(
Contract.Requires(observable != null);
Contract.Requires(propertyName != null);

scheduler = scheduler ?? CurrentThreadScheduler.Instance;
_scheduler = scheduler;

_subject = new ScheduledSubject<T>(scheduler);
_subject.Subscribe(
x =>
{
IReactiveObjectExtensions.RaisePropertyChanging(reactiveObject, propertyName);
_lastValue = x;
IReactiveObjectExtensions.RaisePropertyChanged(reactiveObject, propertyName);
},
ex => _thrownExceptions.Value.OnNext(ex))
.DisposeWith(_disposable);
_onNext = x =>
{
IReactiveObjectExtensions.RaisePropertyChanging(_reactiveObject, _propertyName);
_lastValue = x;
IReactiveObjectExtensions.RaisePropertyChanged(_reactiveObject, _propertyName);
};
_onError = ex => _thrownExceptions.Value.OnNext(ex);

_thrownExceptions = new Lazy<ISubject<Exception>>(() => new ScheduledSubject<Exception>(CurrentThreadScheduler.Instance, RxApp.DefaultExceptionHandler));
_thrownExceptions = new Lazy<ISubject<Exception>>(() =>
new ScheduledSubject<Exception>(CurrentThreadScheduler.Instance, RxApp.DefaultExceptionHandler));

_reactiveObject = reactiveObject;
_propertyName = propertyName;
_lastValue = initialValue;
_source = observable.StartWith(initialValue).DistinctUntilChanged();
if (!deferSubscription)
{
_source.Subscribe(_subject).DisposeWith(_disposable);
if(_scheduler != null)
{
_source.ObserveOn(_scheduler).Subscribe(_onNext, _onError).DisposeWith(_disposable);
}
else
{
_source.Subscribe(_onNext, _onError).DisposeWith(_disposable);
}
_activated = 1;
}
}

private void OnError(Exception ex) => _thrownExceptions.Value.OnNext(ex);

/// <summary>
/// Gets the last provided value from the Observable.
/// </summary>
Expand All @@ -100,7 +113,14 @@ public T Value
{
if (Interlocked.CompareExchange(ref _activated, 1, 0) == 0)
{
_source.Subscribe(_subject).DisposeWith(_disposable);
if (_scheduler != null)
{
_source.ObserveOn(_scheduler).Subscribe(_onNext, _onError).DisposeWith(_disposable);
}
else
{
_source.Subscribe(_onNext, _onError).DisposeWith(_disposable);
}
}

return _lastValue;
Expand Down

0 comments on commit 33c7825

Please sign in to comment.