Skip to content

Commit

Permalink
Another fix using coroutine criteria executor with Hibernate (#2786)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov authored Feb 9, 2024
1 parent 4c98c5a commit d162bed
Show file tree
Hide file tree
Showing 16 changed files with 261 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.micronaut.data.runtime.operations.ExecutorAsyncOperations;
import io.micronaut.data.runtime.operations.ExecutorAsyncOperationsSupportingCriteria;
import io.micronaut.data.runtime.operations.ExecutorReactiveOperations;
import io.micronaut.data.runtime.operations.ExecutorReactiveOperationsSupportingCriteria;
import io.micronaut.transaction.TransactionOperations;
import jakarta.inject.Named;
import jakarta.persistence.EntityManager;
Expand Down Expand Up @@ -585,9 +586,9 @@ public ExecutorAsyncOperations async() {
@Override
public ReactiveRepositoryOperations reactive() {
if (dataConversionService instanceof DataConversionService asDataConversionService) {
return new ExecutorReactiveOperations(async(), asDataConversionService);
return new ExecutorReactiveOperationsSupportingCriteria((ExecutorAsyncOperationsSupportingCriteria) async(), asDataConversionService);
}
return new ExecutorReactiveOperations(async(), null);
return new ExecutorReactiveOperationsSupportingCriteria((ExecutorAsyncOperationsSupportingCriteria) async(), null);
}

@NonNull
Expand Down Expand Up @@ -630,6 +631,20 @@ public <T> List<T> findAll(CriteriaQuery<T> query) {
return executeRead(session -> session.createQuery(query).getResultList());
}

@Override
public <T> List<T> findAll(CriteriaQuery<T> query, int offset, int limit) {
return executeRead(session -> {
Query<T> sessionQuery = session.createQuery(query);
if (offset != -1) {
sessionQuery = sessionQuery.setFetchSize(offset);
}
if (limit != -1) {
sessionQuery = sessionQuery.setMaxResults(limit);
}
return sessionQuery.getResultList();
});
}

@Override
public Optional<Number> updateAll(CriteriaUpdate<Number> query) {
return Optional.ofNullable(executeWrite(session -> session.createMutationQuery(query).executeUpdate()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,20 @@ public <T> Flux<T> findAll(CriteriaQuery<T> query) {
.flatMapIterable(res -> res);
}

@Override
public <T> Flux<T> findAll(CriteriaQuery<T> query, int offset, int limit) {
return withSession(session -> helper.monoFromCompletionStage(() -> {
Stage.SelectionQuery<T> sessionQuery = session.createQuery(query);
if (offset != -1) {
sessionQuery = sessionQuery.setFirstResult(offset);
}
if (limit != -1) {
sessionQuery = sessionQuery.setMaxResults(limit);
}
return sessionQuery.getResultList();
})).flatMapIterable(res -> res);
}

@Override
public Mono<Number> updateAll(CriteriaUpdate<Number> query) {
return withSession(session -> helper.monoFromCompletionStage(() -> session.createQuery(query).executeUpdate()).map(n -> n));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ public interface CriteriaRepositoryOperations {
@NonNull
<T> List<T> findAll(@NonNull CriteriaQuery<T> query);

/**
* Finds all results for the given query.
* @param query The query
* @param offset The offset
* @param limit The limit
* @param <T> The generic type
* @return An iterable result
*/
@NonNull
<T> List<T> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);

/**
* Executes an update for the given query and parameter values. If it is possible to
* return the number of objects updated, then do so.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ default AsyncCriteriaRepositoryOperations async() {
*/
<T> CompletionStage<List<T>> findAll(@NonNull CriteriaQuery<T> query);

/**
* Finds all results for the given query.
*
* @param query The query
* @param offset The offset
* @param limit The limit
* @param <T> The generic type
* @return An iterable result
*/
<T> CompletionStage<List<T>> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);

/**
* Executes an update for the given query and parameter values. If it is possible to
* return the number of objects updated, then do so.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ default <T> List<T> findAll(@NonNull CriteriaQuery<T> query) {
.orElseGet(List::of);
}

@Override
default <T> List<T> findAll(@NonNull CriteriaQuery<T> query, int limit, int offset) {
return reactive().findAll(query, limit, offset)
.collectList()
.contextWrite(getContextView())
.blockOptional()
.orElseGet(List::of);
}

@Override
default Optional<Number> updateAll(@NonNull CriteriaUpdate<Number> query) {
return reactive().updateAll(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ default ReactiveCriteriaRepositoryOperations reactive() {
@NonNull
<T> Publisher<T> findAll(@NonNull CriteriaQuery<T> query);

/**
* Finds all results for the given query.
* @param query The query
* @param offset The offset
* @param limit The limit
* @param <T> The generic type
* @return All result publisher
*/
@NonNull
<T> Publisher<T> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);

/**
* Executes an update for the given query and parameter values. If it is possible to
* return the number of objects updated, then do so.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface ReactorCriteriaRepositoryOperations extends ReactiveCriteriaRep
@Override
<T> Flux<T> findAll(@NonNull CriteriaQuery<T> query);

@Override
<T> Flux<T> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);

@Override
Mono<Number> updateAll(@NonNull CriteriaUpdate<Number> query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@ public <E, K> PreparedQuery<E, K> decorate(PreparedQuery<E, K> preparedQuery) {
protected final Iterable<?> findAll(RepositoryMethodKey methodKey, MethodInvocationContext<T, R> context, Type type) {
Set<JoinPath> methodJoinPaths = getMethodJoinPaths(methodKey, context);
if (criteriaRepositoryOperations != null) {
return criteriaRepositoryOperations.findAll(buildQuery(context, type, methodJoinPaths));
CriteriaQuery<Object> query = buildQuery(context, type, methodJoinPaths);
Pageable pageable = getPageable(context);
if (pageable != null) {
return criteriaRepositoryOperations.findAll(query, (int) pageable.getOffset(), pageable.getSize());
}
return criteriaRepositoryOperations.findAll(query);
}
return operations.findAll(preparedQueryForCriteria(methodKey, context, type, methodJoinPaths));
}
Expand Down Expand Up @@ -314,7 +319,7 @@ private <E> StoredQuery<E, Object> buildFind(RepositoryMethodKey methodKey,
Type type,
Set<JoinPath> methodJoinPaths) {

CriteriaQuery<Object> criteriaQuery = buildQuery(context, type, methodJoinPaths);
CriteriaQuery<Object> criteriaQuery = buildInternalQuery(context, type, methodJoinPaths);
QueryBuilder sqlQueryBuilder = getQueryBuilder(methodKey, context);
QueryResultPersistentEntityCriteriaQuery queryModelCriteriaQuery = (QueryResultPersistentEntityCriteriaQuery) criteriaQuery;
QueryModel queryModel = queryModelCriteriaQuery.getQueryModel();
Expand All @@ -331,7 +336,7 @@ private <E> StoredQuery<E, Object> buildFind(RepositoryMethodKey methodKey,
criteriaQuery.getResultType(), !pageable.isUnpaged(), joinPaths);
}

protected final <N> CriteriaQuery<N> buildQuery(MethodInvocationContext<T, R> context, Type type, Set<JoinPath> methodJoinPaths) {
private <N> CriteriaQuery<N> buildInternalQuery(MethodInvocationContext<T, R> context, Type type, Set<JoinPath> methodJoinPaths) {
CriteriaQueryBuilder<N> builder = getCriteriaQueryBuilder(context, methodJoinPaths);
CriteriaQuery<N> criteriaQuery = builder.build(criteriaBuilder);

Expand All @@ -350,6 +355,22 @@ protected final <N> CriteriaQuery<N> buildQuery(MethodInvocationContext<T, R> co
return criteriaQuery;
}

protected final <N> CriteriaQuery<N> buildQuery(MethodInvocationContext<T, R> context, Type type, Set<JoinPath> methodJoinPaths) {
CriteriaQueryBuilder<N> builder = getCriteriaQueryBuilder(context, methodJoinPaths);
CriteriaQuery<N> criteriaQuery = builder.build(criteriaBuilder);

for (Object param : context.getParameterValues()) {
if (param instanceof Sort sort) {
if (sort.isSorted()) {
Root<?> root = criteriaQuery.getRoots().stream().findFirst().orElseThrow(() -> new IllegalStateException("The root not found!"));
criteriaQuery.orderBy(getOrders(sort, root, criteriaBuilder));
break;
}
}
}
return criteriaQuery;
}

/**
* Find {@link io.micronaut.data.repository.jpa.criteria.QuerySpecification} in context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import io.micronaut.core.type.Argument;
import io.micronaut.data.exceptions.DataAccessException;
import io.micronaut.data.intercept.RepositoryMethodKey;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.query.JoinPath;
import io.micronaut.data.operations.RepositoryOperations;
import io.micronaut.data.operations.async.AsyncCapableRepository;
import io.micronaut.data.operations.async.AsyncCriteriaCapableRepository;
import io.micronaut.data.operations.async.AsyncCriteriaRepositoryOperations;
import io.micronaut.data.operations.async.AsyncRepositoryOperations;
import io.micronaut.data.runtime.intercept.criteria.AbstractSpecificationInterceptor;
import jakarta.persistence.criteria.CriteriaQuery;

import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -76,7 +78,12 @@ protected AbstractAsyncSpecificationInterceptor(RepositoryOperations operations)
protected final CompletionStage<Iterable<Object>> findAllAsync(RepositoryMethodKey methodKey, MethodInvocationContext<T, R> context, Type type) {
Set<JoinPath> methodJoinPaths = getMethodJoinPaths(methodKey, context);
if (asyncCriteriaOperations != null) {
return asyncCriteriaOperations.findAll(buildQuery(context, type, methodJoinPaths)).thenApply(m -> m);
CriteriaQuery<Object> criteriaQuery = buildQuery(context, type, methodJoinPaths);
Pageable pageable = getPageable(context);
if (pageable != null) {
return asyncCriteriaOperations.findAll(criteriaQuery, (int) pageable.getOffset(), pageable.getSize()).thenApply(m -> m);
}
return asyncCriteriaOperations.findAll(criteriaQuery).thenApply(m -> m);
}
return asyncOperations.findAll(preparedQueryForCriteria(methodKey, context, type, methodJoinPaths));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import io.micronaut.core.annotation.NonNull;
import io.micronaut.data.exceptions.DataAccessException;
import io.micronaut.data.intercept.RepositoryMethodKey;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.query.JoinPath;
import io.micronaut.data.operations.RepositoryOperations;
import io.micronaut.data.operations.reactive.ReactiveCapableRepository;
import io.micronaut.data.operations.reactive.ReactiveCriteriaCapableRepository;
import io.micronaut.data.operations.reactive.ReactiveCriteriaRepositoryOperations;
import io.micronaut.data.operations.reactive.ReactiveRepositoryOperations;
import io.micronaut.data.runtime.intercept.criteria.AbstractSpecificationInterceptor;
import jakarta.persistence.criteria.CriteriaQuery;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -56,7 +58,9 @@ protected AbstractReactiveSpecificationInterceptor(RepositoryOperations operatio
} else {
throw new DataAccessException("Datastore of type [" + operations.getClass() + "] does not support reactive operations");
}
if (operations instanceof ReactiveCriteriaRepositoryOperations reactiveCriteriaRepositoryOperations) {
if (reactiveOperations instanceof ReactiveCriteriaRepositoryOperations reactiveCriteriaRepositoryOperations) {
reactiveCriteriaOperations = reactiveCriteriaRepositoryOperations;
} else if (operations instanceof ReactiveCriteriaRepositoryOperations reactiveCriteriaRepositoryOperations) {
reactiveCriteriaOperations = reactiveCriteriaRepositoryOperations;
} else if (operations instanceof ReactiveCriteriaCapableRepository repository) {
reactiveCriteriaOperations = repository.reactive();
Expand All @@ -69,7 +73,12 @@ protected AbstractReactiveSpecificationInterceptor(RepositoryOperations operatio
protected final Publisher<Object> findAllReactive(RepositoryMethodKey methodKey, MethodInvocationContext<T, R> context, Type type) {
Set<JoinPath> methodJoinPaths = getMethodJoinPaths(methodKey, context);
if (reactiveCriteriaOperations != null) {
return reactiveCriteriaOperations.findAll(buildQuery(context, type, methodJoinPaths));
CriteriaQuery<Object> criteriaQuery = buildQuery(context, type, methodJoinPaths);
Pageable pageable = getPageable(context);
if (pageable != null) {
return reactiveCriteriaOperations.findAll(criteriaQuery, (int) pageable.getOffset(), pageable.getSize());
}
return reactiveCriteriaOperations.findAll(criteriaQuery);
}
return reactiveOperations.findAll(preparedQueryForCriteria(methodKey, context, type, methodJoinPaths));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ExecutorAsyncOperations(@NonNull RepositoryOperations operations, @NonNul
}

@Internal
<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
final <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
CompletableFuture<T> cf = new CompletableFuture<>();
PropagatedContext propagatedContext = PropagatedContext.getOrEmpty();
CompletableFuture.supplyAsync(PropagatedContext.wrapCurrent(supplier), executor).whenComplete((value, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public <T> CompletionStage<List<T>> findAll(CriteriaQuery<T> query) {
return supplyAsync(() -> criteriaRepositoryOperations.findAll(query));
}

@Override
public <T> CompletionStage<List<T>> findAll(CriteriaQuery<T> query, int offset, int limit) {
return supplyAsync(() -> criteriaRepositoryOperations.findAll(query, offset, limit));
}

@Override
public CompletionStage<Number> updateAll(CriteriaUpdate<Number> query) {
return supplyAsync(() -> criteriaRepositoryOperations.updateAll(query).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public <T> Publisher<Number> deleteAll(@NonNull DeleteBatchOperation<T> operatio
.map(number -> convertNumberArgumentIfNecessary(number, operation.getResultArgument()));
}

private <R> Mono<R> fromCompletableFuture(Supplier<CompletableFuture<R>> futureSupplier) {
protected final <R> Mono<R> fromCompletableFuture(Supplier<CompletableFuture<R>> futureSupplier) {
return Mono.fromCompletionStage(PropagatedContext.wrapCurrent(futureSupplier));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.data.runtime.operations;

import io.micronaut.core.annotation.Experimental;
import io.micronaut.data.operations.reactive.ReactiveCriteriaRepositoryOperations;
import io.micronaut.data.runtime.convert.DataConversionService;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaDelete;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.CriteriaUpdate;
import org.reactivestreams.Publisher;

/**
* A variation of {@link ExecutorReactiveOperations} that supports {@link ReactiveCriteriaRepositoryOperations}.
* @author Denis Stepanov
*/
@Experimental
public class ExecutorReactiveOperationsSupportingCriteria extends ExecutorReactiveOperations implements ReactiveCriteriaRepositoryOperations {

private final ExecutorAsyncOperationsSupportingCriteria asyncOperations;

public ExecutorReactiveOperationsSupportingCriteria(ExecutorAsyncOperationsSupportingCriteria asyncOperations,
DataConversionService dataConversionService) {
super(asyncOperations, dataConversionService);
this.asyncOperations = asyncOperations;
}

@Override
public CriteriaBuilder getCriteriaBuilder() {
return asyncOperations.getCriteriaBuilder();
}

@Override
public <R> Publisher<R> findOne(CriteriaQuery<R> query) {
return fromCompletableFuture(() -> asyncOperations.findOne(query).toCompletableFuture());
}

@Override
public <T> Publisher<T> findAll(CriteriaQuery<T> query) {
return fromCompletableFuture(() -> asyncOperations.findAll(query).toCompletableFuture()).flatMapIterable(list -> list);
}

@Override
public <T> Publisher<T> findAll(CriteriaQuery<T> query, int offset, int limit) {
return fromCompletableFuture(() -> asyncOperations.findAll(query, offset, limit).toCompletableFuture()).flatMapIterable(list -> list);
}

@Override
public Publisher<Number> updateAll(CriteriaUpdate<Number> query) {
return fromCompletableFuture(() -> asyncOperations.updateAll(query).toCompletableFuture());
}

@Override
public Publisher<Number> deleteAll(CriteriaDelete<Number> query) {
return fromCompletableFuture(() -> asyncOperations.deleteAll(query).toCompletableFuture());
}
}
1 change: 1 addition & 0 deletions doc-examples/hibernate-example-kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation mnKotlin.micronaut.kotlin.runtime
implementation mnValidation.micronaut.validation
implementation libs.kotlin.coroutines
implementation libs.kotlin.coroutines.reactive

kapt mnValidation.micronaut.validation
kapt projects.micronautDataProcessor
Expand Down
Loading

0 comments on commit d162bed

Please sign in to comment.