Skip to content

Commit

Permalink
Relax type restrictions in Promises.partitioned + preparing next release
Browse files Browse the repository at this point in the history
  • Loading branch information
vsilaev committed Aug 25, 2023
1 parent 137ea7b commit 8680f26
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 55 deletions.
9 changes: 8 additions & 1 deletion .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<classpath>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
Expand All @@ -17,7 +18,7 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JDK 12">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-12">
<attributes>
<attribute name="module" value="true"/>
<attribute name="maven.pomderived" value="true"/>
Expand All @@ -28,5 +29,11 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="test" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
10 changes: 5 additions & 5 deletions .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.targetPlatform=12
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.compliance=12
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
org.eclipse.jdt.core.compiler.release=enabled
org.eclipse.jdt.core.compiler.source=12
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Maven Central](https://img.shields.io/maven-central/v/net.tascalate/net.tascalate.concurrent.svg)](https://search.maven.org/artifact/net.tascalate/net.tascalate.concurrent/0.9.6/jar) [![GitHub release](https://img.shields.io/github/release/vsilaev/tascalate-concurrent.svg)](https://github.com/vsilaev/tascalate-concurrent/releases/tag/0.9.6) [![license](https://img.shields.io/github/license/vsilaev/tascalate-concurrent.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
[![Maven Central](https://img.shields.io/maven-central/v/net.tascalate/net.tascalate.concurrent.svg)](https://search.maven.org/artifact/net.tascalate/net.tascalate.concurrent/0.9.7/jar) [![GitHub release](https://img.shields.io/github/release/vsilaev/tascalate-concurrent.svg)](https://github.com/vsilaev/tascalate-concurrent/releases/tag/0.9.7) [![license](https://img.shields.io/github/license/vsilaev/tascalate-concurrent.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
# tascalate-concurrent
The library provides an implementation of the [CompletionStage](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html) interface and related classes these are designed to support long-running blocking tasks (typically, I/O bound). This functionality augments the sole Java 8 built-in implementation, [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html), that is primarily supports computational tasks. Also, the library helps with numerous asynchronous programing challenges like handling timeouts, retry/poll functionality, orchestrating results of multiple concurrent computations and similar.

Expand All @@ -12,7 +12,7 @@ New name:
<dependency>
<groupId>net.tascalate</groupId>
<artifactId>net.tascalate.concurrent</artifactId>
<version>0.9.6</version> <!-- Any version above 0.8.0, the latest one is recommended -->
<version>0.9.7</version> <!-- Any version above 0.8.0, the latest one is recommended -->
</dependency>
```
Old Name
Expand All @@ -38,7 +38,7 @@ To use a library you have to add a single Maven dependency
<dependency>
<groupId>net.tascalate</groupId>
<artifactId>net.tascalate.concurrent</artifactId>
<version>0.9.6</version>
<version>0.9.7</version>
</dependency>
```
# What is inside?
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>net.tascalate</groupId>
<artifactId>net.tascalate.concurrent</artifactId>
<version>0.9.6</version>
<version>0.9.7</version>
<packaging>jar</packaging>

<name>Tascalate Concurrent</name>
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/net/tascalate/concurrent/PromiseOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public static <T, F extends Promise<T>> Function<F, F> peek(Consumer<? super F>
return p -> unwrap(Promises.tryComposeEx(p.dependent(PromiseOrigin.ALL), fn));
}

public static <T, A, R> Function<Promise<Iterable<T>>, Promise<R>>
public static <S, T, A, R> Function<Promise<Iterable<S>>, Promise<R>>
partitionedItems(int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {

return p -> p.dependent()
Expand All @@ -105,9 +105,9 @@ public static <T, F extends Promise<T>> Function<F, F> peek(Consumer<? super F>
.unwrap();
}

public static <T, A, R> Function<Promise<Iterable<T>>, Promise<R>>
public static <S, T, A, R> Function<Promise<Iterable<S>>, Promise<R>>
partitionedItems(int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {

Expand All @@ -117,9 +117,9 @@ public static <T, F extends Promise<T>> Function<F, F> peek(Consumer<? super F>
.unwrap();
}

public static <T, A, R> Function<Promise<Stream<T>>, Promise<R>>
public static <S, T, A, R> Function<Promise<Stream<S>>, Promise<R>>
partitionedStream(int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {

return p -> p.dependent()
Expand All @@ -128,9 +128,9 @@ public static <T, F extends Promise<T>> Function<F, F> peek(Consumer<? super F>
.unwrap();
}

public static <T, A, R> Function<Promise<Stream<T>>, Promise<R>>
public static <S, T, A, R> Function<Promise<Stream<S>>, Promise<R>>
partitionedStream(int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {

Expand Down
74 changes: 37 additions & 37 deletions src/main/java/net/tascalate/concurrent/Promises.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,41 +253,41 @@ public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(Promise<R> p
});
}

public static <T, A, R> Promise<R> partitioned(Iterable<? extends T> values,
int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {
public static <S, T, A, R> Promise<R> partitioned(Iterable<? extends S> values,
int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {
return partitioned1(values.iterator(), null, batchSize, spawner, downstream);
}

public static <T, A, R> Promise<R> partitioned(Iterable<? extends T> values,
int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {
public static <S, T, A, R> Promise<R> partitioned(Iterable<? extends S> values,
int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {
return partitioned2(values.iterator(), null, batchSize, spawner, downstream, downstreamExecutor);
}

public static <T, A, R> Promise<R> partitioned(Stream<? extends T> values,
int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {
public static <S, T, A, R> Promise<R> partitioned(Stream<? extends S> values,
int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {
return partitioned1(values.iterator(), values, batchSize, spawner, downstream);
}

public static <T, A, R> Promise<R> partitioned(Stream<? extends T> values,
int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {
public static <S, T, A, R> Promise<R> partitioned(Stream<? extends S> values,
int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {
return partitioned2(values.iterator(), values, batchSize, spawner, downstream, downstreamExecutor);
}

private static <T, A, R> Promise<R> partitioned1(Iterator<? extends T> values,
Object source,
int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {
private static <S, T, A, R> Promise<R> partitioned1(Iterator<? extends S> values,
Object source,
int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {
return
parallelStep1(values, batchSize, spawner, downstream)
.dependent()
Expand All @@ -296,12 +296,12 @@ private static <T, A, R> Promise<R> partitioned1(Iterator<? extends T> values,
.unwrap();
}

private static <T, A, R> Promise<R> partitioned2(Iterator<? extends T> values,
Object source,
int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {
private static <S, T, A, R> Promise<R> partitioned2(Iterator<? extends S> values,
Object source,
int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {
return
parallelStep2(values, batchSize, spawner, downstream, downstreamExecutor)
.dependent()
Expand All @@ -310,13 +310,13 @@ private static <T, A, R> Promise<R> partitioned2(Iterator<? extends T> values,
.unwrap();
}

private static <T, A, R> Promise<IndexedStep<A>> parallelStep1(
Iterator<? extends T> values, int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
private static <S, T, A, R> Promise<IndexedStep<A>> parallelStep1(
Iterator<? extends S> values, int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream) {

return loop(new IndexedStep<>(), step -> step.initial() || values.hasNext(), step -> {
List<T> valuesBatch = drainBatch(values, batchSize);
List<S> valuesBatch = drainBatch(values, batchSize);
if (valuesBatch.isEmpty()) {
// Over
return Promises.success(step.initial() ? step.next(downstream.supplier().get()) : step);
Expand All @@ -336,14 +336,14 @@ private static <T, A, R> Promise<IndexedStep<A>> parallelStep1(
});
}

private static <T, A, R> Promise<IndexedStep<A>> parallelStep2(
Iterator<? extends T> values, int batchSize,
Function<? super T, CompletionStage<? extends T>> spawner,
private static <S, T, A, R> Promise<IndexedStep<A>> parallelStep2(
Iterator<? extends S> values, int batchSize,
Function<? super S, CompletionStage<? extends T>> spawner,
Collector<T, A, R> downstream,
Executor downstreamExecutor) {

return loop(new IndexedStep<>(), step -> step.initial() || values.hasNext(), step -> {
List<T> valuesBatch = drainBatch(values, batchSize);
List<S> valuesBatch = drainBatch(values, batchSize);
if (valuesBatch.isEmpty()) {
// Over
return step.initial() ?
Expand Down

0 comments on commit 8680f26

Please sign in to comment.