diff --git a/src/ReactiveUI/ObservableForProperty/ObservableAsPropertyHelper.cs b/src/ReactiveUI/ObservableForProperty/ObservableAsPropertyHelper.cs index 316b212ae8..ae4cae191e 100644 --- a/src/ReactiveUI/ObservableForProperty/ObservableAsPropertyHelper.cs +++ b/src/ReactiveUI/ObservableForProperty/ObservableAsPropertyHelper.cs @@ -26,10 +26,14 @@ public sealed class ObservableAsPropertyHelper : IHandleObservableErrors, IDi { private readonly Lazy> _thrownExceptions; private readonly IObservable _source; - private readonly ISubject _subject; + private readonly IReactiveObject _reactiveObject; + private readonly string _propertyName; private T _lastValue; private CompositeDisposable _disposable = new CompositeDisposable(); private int _activated; + private readonly Action _onNext; + private readonly Action _onError; + private readonly IScheduler _scheduler; /// /// Initializes a new instance of the class. @@ -67,30 +71,39 @@ public ObservableAsPropertyHelper( Contract.Requires(observable != null); Contract.Requires(propertyName != null); - scheduler = scheduler ?? CurrentThreadScheduler.Instance; + _scheduler = scheduler; - _subject = new ScheduledSubject(scheduler); - _subject.Subscribe( - x => - { - IReactiveObjectExtensions.RaisePropertyChanging(reactiveObject, propertyName); - _lastValue = x; - IReactiveObjectExtensions.RaisePropertyChanged(reactiveObject, propertyName); - }, - ex => _thrownExceptions.Value.OnNext(ex)) - .DisposeWith(_disposable); + _onNext = x => + { + IReactiveObjectExtensions.RaisePropertyChanging(_reactiveObject, _propertyName); + _lastValue = x; + IReactiveObjectExtensions.RaisePropertyChanged(_reactiveObject, _propertyName); + }; + _onError = ex => _thrownExceptions.Value.OnNext(ex); - _thrownExceptions = new Lazy>(() => new ScheduledSubject(CurrentThreadScheduler.Instance, RxApp.DefaultExceptionHandler)); + _thrownExceptions = new Lazy>(() => + new ScheduledSubject(CurrentThreadScheduler.Instance, RxApp.DefaultExceptionHandler)); + _reactiveObject = reactiveObject; + _propertyName = propertyName; _lastValue = initialValue; _source = observable.StartWith(initialValue).DistinctUntilChanged(); if (!deferSubscription) { - _source.Subscribe(_subject).DisposeWith(_disposable); + if(_scheduler != null) + { + _source.ObserveOn(_scheduler).Subscribe(_onNext, _onError).DisposeWith(_disposable); + } + else + { + _source.Subscribe(_onNext, _onError).DisposeWith(_disposable); + } _activated = 1; } } + private void OnError(Exception ex) => _thrownExceptions.Value.OnNext(ex); + /// /// Gets the last provided value from the Observable. /// @@ -100,7 +113,14 @@ public T Value { if (Interlocked.CompareExchange(ref _activated, 1, 0) == 0) { - _source.Subscribe(_subject).DisposeWith(_disposable); + if (_scheduler != null) + { + _source.ObserveOn(_scheduler).Subscribe(_onNext, _onError).DisposeWith(_disposable); + } + else + { + _source.Subscribe(_onNext, _onError).DisposeWith(_disposable); + } } return _lastValue;