Skip to content

Commit

Permalink
Merge pull request #101 from amilcar-andrade/rx-java-3
Browse files Browse the repository at this point in the history
RxMobius: Adds support for RxJava3
  • Loading branch information
pettermahlen authored Apr 22, 2020
2 parents 3f10ffc + e38d70a commit ca73696
Show file tree
Hide file tree
Showing 23 changed files with 2,077 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ testImplementation 'com.spotify.mobius:mobius-test:LATEST_RELEASE'
implementation 'com.spotify.mobius:mobius-rx:LATEST_RELEASE' // only for RxJava 1 support
implementation 'com.spotify.mobius:mobius-rx2:LATEST_RELEASE' // only for RxJava 2 support
implementation 'com.spotify.mobius:mobius-rx3:LATEST_RELEASE' // only for RxJava 3 support
implementation 'com.spotify.mobius:mobius-android:LATEST_RELEASE' // only for Android support
implementation 'com.spotify.mobius:mobius-extras:LATEST_RELEASE' // utilities for common patterns
```
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ext {
'logback' : '1.2.3',
'rxJava' : '1.3.8',
'rxJava2' : '2.2.17',
'rxJava3' : '3.0.0',
'slf4j' : '1.7.25',
'jsr305' : '3.0.2',
'hamcrestLibrary' : '1.3',
Expand Down
33 changes: 33 additions & 0 deletions mobius-rx3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apply plugin: 'java-library'

dependencies {
api "io.reactivex.rxjava3:rxjava:${versions.rxJava3}"
api project(':mobius-core')

implementation "com.google.code.findbugs:jsr305:${versions.jsr305}"

testAnnotationProcessor "com.google.auto.value:auto-value:${versions.autoValue}"

testImplementation project(':mobius-test')
testImplementation "junit:junit:${versions.junit}"
testImplementation "com.google.guava:guava:${versions.guava}"
testImplementation "org.hamcrest:hamcrest-library:${versions.hamcrestLibrary}"
testImplementation "ch.qos.logback:logback-classic:${versions.logback}"
testImplementation "org.awaitility:awaitility:${versions.awaitility}"
testImplementation "com.google.auto.value:auto-value-annotations:${versions.autoValue}"
}

compileJava {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

test {
testLogging {
events "skipped", "failed"
exceptionFormat "full"
}
}

apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
apply from: rootProject.file('gradle/jacoco-coverage.gradle')
3 changes: 3 additions & 0 deletions mobius-rx3/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
POM_ARTIFACT_ID=mobius-rx3
POM_NAME=RxJava3 tools for Mobius
POM_DESCRIPTION=RxJava3 utilities for use with Mobius
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* -\-\-
* Mobius
* --
* Copyright (c) 2017-2018 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/
package com.spotify.mobius.rx3;

import static com.spotify.mobius.internal_util.Preconditions.checkNotNull;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.functions.Function;
import java.util.ArrayList;
import java.util.List;

/**
* Utility that dispatches each item emitted from a source observable of type T to multiple other
* ObservableTransformers, and merges the results to a stream of type R.
*
* @param <T> input type
* @param <R> output type
*/
class MergedTransformer<T, R> implements ObservableTransformer<T, R> {

@NonNull private final Iterable<ObservableTransformer<T, R>> transformers;

MergedTransformer(@NonNull Iterable<ObservableTransformer<T, R>> transformers) {
this.transformers = checkNotNull(transformers);
}

@Override
public Observable<R> apply(@NonNull Observable<T> input) {
return input.publish(
new Function<Observable<T>, Observable<R>>() {
@Override
public Observable<R> apply(Observable<T> innerInput) throws Throwable {
final List<Observable<R>> transformed = new ArrayList<>();
for (ObservableTransformer<T, R> transformer : transformers) {
transformed.add(innerInput.compose(transformer));
}
return Observable.merge(transformed);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* -\-\-
* Mobius
* --
* Copyright (c) 2017-2018 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/
package com.spotify.mobius.rx3;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Predicate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Transformer that routes each incoming Effect descriptor to a sub-transformer associated with the
* Effect descriptor class.
*/
class MobiusEffectRouter<F, E> implements ObservableTransformer<F, E> {

@NonNull private final MergedTransformer<F, E> mergedTransformer;

MobiusEffectRouter(
@NonNull Set<Class<?>> handledEffectClasses,
@NonNull Collection<ObservableTransformer<F, E>> effectPerformers) {

final Set<Class<?>> effectClasses = new HashSet<>(handledEffectClasses);
final List<ObservableTransformer<F, E>> immutableEffectPerformers =
Collections.unmodifiableList(new ArrayList<>(effectPerformers));

final ObservableTransformer<F, E> unhandledEffectHandler =
new ObservableTransformer<F, E>() {
@Override
public @NonNull ObservableSource<E> apply(@NonNull Observable<F> effects) {
return effects
.filter(
new Predicate<F>() {
@Override
public boolean test(F e) throws Throwable {
for (Class<?> effectClass : effectClasses) {
if (effectClass.isAssignableFrom(e.getClass())) {
return false;
}
}
return true;
}
})
.map(
new Function<F, E>() {
@Override
public E apply(F e) throws Throwable {
throw new UnknownEffectException(e);
}
});
}
};
final List<ObservableTransformer<F, E>> allHandlers =
new ArrayList<>(immutableEffectPerformers);
allHandlers.add(unhandledEffectHandler);
mergedTransformer = new MergedTransformer<>(allHandlers);
}

@Override
public Observable<E> apply(@NonNull Observable<F> effects) {
return effects.compose(mergedTransformer);
}
}
138 changes: 138 additions & 0 deletions mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxConnectables.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* -\-\-
* Mobius
* --
* Copyright (c) 2017-2018 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/
package com.spotify.mobius.rx3;

import static com.spotify.mobius.internal_util.Preconditions.checkNotNull;

import com.spotify.mobius.Connectable;
import com.spotify.mobius.Connection;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Cancellable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.subjects.PublishSubject;
import javax.annotation.Nonnull;

/**
* Contains utility methods for converting back and forth between {@link ObservableTransformer}s and
* {@link Connectable}s.
*/
public final class RxConnectables {

private RxConnectables() {}

public static <I, O> Connectable<I, O> fromTransformer(
@NonNull final ObservableTransformer<I, O> transformer) {
checkNotNull(transformer);
return new Connectable<I, O>() {
@Nonnull
@Override
public Connection<I> connect(com.spotify.mobius.functions.Consumer<O> output) {
final PublishSubject<I> subject = PublishSubject.create();

final Disposable disposable =
subject
.compose(transformer)
.subscribe(
new Consumer<O>() {
@Override
public void accept(O value) throws Throwable {
output.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {}
});

return new Connection<I>() {
public void accept(I effect) {
subject.onNext(effect);
}

@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}

@NonNull
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
final Connection<I> input = connectable.connect(output);
final Disposable disposable =
upstream.subscribe(
new Consumer<I>() {
@Override
public void accept(I value) throws Throwable {
input.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
emitter.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {
emitter.onComplete();
}
});

emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}
}
Loading

0 comments on commit ca73696

Please sign in to comment.