Skip to content

Commit

Permalink
Merge pull request #1 from slatepowered/cached-bulk-ops
Browse files Browse the repository at this point in the history
Cached bulk operations
  • Loading branch information
orbyfied authored Dec 10, 2023
2 parents e316d7f + d929c6d commit e315eaa
Show file tree
Hide file tree
Showing 26 changed files with 932 additions and 462 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* Caffeine-cache based implementation of {@link DataCache}.
Expand Down Expand Up @@ -51,6 +52,11 @@ public int size() {
return cache.asMap().size();
}

@Override
public Stream<DataItem<K, T>> stream() {
return cache.asMap().values().stream();
}

@Override
public Iterator<DataItem<K, T>> iterator() {
return cache.asMap().values().iterator(); // this kinda sucks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* Provides caching capabilities for data items in datastores with
Expand Down Expand Up @@ -47,6 +48,13 @@ public interface DataCache<K, T> extends Iterable<DataItem<K, T>> {
*/
int size();

/**
* Stream the values in this cache.
*
* @return The values.
*/
Stream<DataItem<K, T>> stream();

/**
* A simple, permanent data cache backed by a {@link ConcurrentHashMap} for
* fast lookup and a {@link Vector} for fast iteration. With no special mechanisms,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.*;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* A thread-safe double backed (by a list and map) implementation of
Expand Down Expand Up @@ -43,6 +44,11 @@ public Iterator<DataItem<K, T>> iterator() {
return list.iterator();
}

@Override
public Stream<DataItem<K, T>> stream() {
return list.stream();
}

@Override
public int size() {
int r;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package slatepowered.inset.codec;

import slatepowered.inset.datastore.DataItem;
import slatepowered.inset.modifier.Projection;
import slatepowered.inset.operation.Projection;
import slatepowered.inset.query.Query;

import java.util.function.Predicate;
Expand Down
10 changes: 10 additions & 0 deletions inset-core/src/main/java/slatepowered/inset/codec/ValueCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ default <C extends ValueCodec<T>> C expect(Class<? super C> cClass) {
*/
void decode(CodecContext context, T instance, DecodeInput input);

/**
* Get the field on the given instance.
*
* @param instance The instance.
* @param field The field.
* @param <V> The field value type.
* @return The value of the field or null if absent.
*/
<V> V getField(T instance, String field);

default T constructAndDecode(CodecContext context, DecodeInput input) {
T instance = construct(context, input);
decode(context, instance, input);
Expand Down
77 changes: 59 additions & 18 deletions inset-core/src/main/java/slatepowered/inset/datastore/DataItem.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package slatepowered.inset.datastore;

import slatepowered.inset.codec.CodecContext;
import slatepowered.inset.codec.DataCodec;
import slatepowered.inset.codec.DecodeInput;
import slatepowered.inset.codec.EncodeOutput;
import slatepowered.inset.codec.*;
import slatepowered.inset.operation.Sorting;
import slatepowered.inset.query.FoundItem;
import slatepowered.inset.query.Query;
import slatepowered.inset.source.DataSourceFindResult;
import slatepowered.inset.source.DataTable;

import java.lang.reflect.Type;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -19,7 +19,7 @@
* @param <K> The primary key type.
* @param <T> The data type.
*/
public class DataItem<K, T> {
public class DataItem<K, T> extends FoundItem<K, T> {

public DataItem(Datastore<K, T> datastore, K key) {
this.datastore = datastore;
Expand Down Expand Up @@ -60,6 +60,9 @@ public DataItem(Datastore<K, T> datastore, K key) {
*/
protected volatile int lastReferenceTime;

private double[] cachedOrderCoefficient; // The cached order coefficient array
private int currentSortId; // The ID the cached order coefficient is for

/**
* Get the primary key for this item.
*
Expand Down Expand Up @@ -237,6 +240,41 @@ public DataItem<K, T> fetchSync() {
return decode(queryResult.input()).fetchedNow();
}

@Override
public boolean isPartial() {
return false;
}

@Override
public DecodeInput input() {
throw new UnsupportedOperationException("TODO: Create decode input for DataItem"); // TODO
}

@Override
public K getOrReadKey(String fieldName, Type expectedType) {
return key;
}

@Override
public K getKey() {
return key;
}

@Override
public <V> V getField(String fieldName, Type expectedType) {
return value != null ? datastore.getDataCodec().getField(value, fieldName) : null;
}

@Override
public <V> V project(Class<V> vClass) {
return null; // TODO
}

@Override
public DataItem<K, T> fetch() {
return this;
}

/**
* Asynchronously fetch and decode the value for this data item.
*
Expand All @@ -248,6 +286,22 @@ public CompletableFuture<DataItem<K, T>> fetchAsync() {
.thenApply(result -> this.decode(result.input()).fetchedNow());
}

@Override
public double[] createFastOrderCoefficients(String[] fields, Sorting sorting) {
final int len = fields.length;
final double[] arr = new double[len];
final DataCodec<K, T> codec = datastore.getDataCodec();

for (int i = 0; i < len; i++) {
String field = fields[i];
Object obj = codec.getField(value, field);
if (obj instanceof Number)
arr[i] = ((Number) obj).doubleValue();
}

return arr;
}

// Update the lastReferenceTime to represent the
// current instant
protected DataItem<K, T> referencedNow() {
Expand All @@ -273,17 +327,4 @@ public String toString() {
return "DataItem(" + key + " = " + value + ')';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DataItem<?, ?> dataItem = (DataItem<?, ?>) o;
return Objects.equals(datastore, dataItem.datastore) && Objects.equals(key, dataItem.key);
}

@Override
public int hashCode() {
return Objects.hash(datastore, key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
import slatepowered.inset.codec.CodecRegistry;
import slatepowered.inset.codec.DataCodec;
import slatepowered.inset.codec.DecodeInput;
import slatepowered.inset.query.FindAllStatus;
import slatepowered.inset.query.FindAllOperation;
import slatepowered.inset.query.Query;
import slatepowered.inset.query.FindResult;
import slatepowered.inset.query.FindStatus;
import slatepowered.inset.query.FindOperation;
import slatepowered.inset.source.DataTable;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Represents a datastore of values of type {@code T} with primary key type {@code K}.
Expand Down Expand Up @@ -180,17 +182,17 @@ public DataItem<K, T> findOneCached(Query query) {
* @return The query status object.
*/
@SuppressWarnings("unchecked")
public FindStatus<K, T> findOne(Query query) {
public FindOperation<K, T> findOne(Query query) {
DataItem<K, T> cachedItem = findOneCached(query);
if (cachedItem != null) {
return new FindStatus<>(this, query).completeSuccessfully(FindResult.CACHED, cachedItem);
return new FindOperation<>(this, query).completeSuccessfully(FindResult.CACHED, cachedItem);
}

query = query.qualify(this);

// asynchronously try to load the item
// from the datatable
FindStatus<K, T> queryStatus = new FindStatus<>(this, query);
FindOperation<K, T> queryStatus = new FindOperation<>(this, query);
getSourceTable().findOneAsync(query)
.whenComplete((result, throwable) -> {
if (throwable != null) {
Expand Down Expand Up @@ -218,7 +220,7 @@ public FindStatus<K, T> findOne(Query query) {
* @param key The key.
* @return This.
*/
public FindStatus<K, T> findOne(K key) {
public FindOperation<K, T> findOne(K key) {
return findOne(Query.byKey(key));
}

Expand Down Expand Up @@ -252,6 +254,10 @@ public List<DataItem<K, T>> findAllCached(Query query) {
return list;
}

static final FindAllOperation.Options DEFAULT_FIND_ALL_OPTIONS = FindAllOperation.Options.builder()
.useCaches(false)
.build();

/**
* Find all items matching the given query in the database.
*
Expand All @@ -262,8 +268,31 @@ public List<DataItem<K, T>> findAllCached(Query query) {
* @param query The filter query.
* @return The status of the operation.
*/
public FindAllStatus<K, T> findAll(Query query) {
FindAllStatus<K, T> status = new FindAllStatus<>(this, query);
public FindAllOperation<K, T> findAll(Query query) {
return findAll(query, DEFAULT_FIND_ALL_OPTIONS);
}

/**
* Find all items matching the given query in the database.
*
* Note that the aggregation/find operation is never cached if {@code useCaches} is
* disabled and always references the database, the individual items may be
* resolved from the cache or cached though.
*
* @param query The filter query.
* @param options The options for this operation.
* @return The status of the operation.
*/
public FindAllOperation<K, T> findAll(Query query, FindAllOperation.Options options) {
FindAllOperation<K, T> status = new FindAllOperation<>(this, query, options);

// filter cached item stream
if (options.isUseCaches()) {
final Predicate<T> filterPredicate = dataCodec.getFilterPredicate(query);
Stream<DataItem<K, T>> cachedStream = dataCache.stream().filter(dataItem -> dataItem.isPresent() && filterPredicate.test(dataItem.get()));
status.withCached(cachedStream);
}

getSourceTable().findAllAsync(query)
.whenComplete((result, throwable) -> {
if (throwable != null) {
Expand All @@ -274,6 +303,7 @@ public FindAllStatus<K, T> findAll(Query query) {
// 'complete' the operation with the bulk iterable
status.completeSuccessfully(result);
});

return status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import slatepowered.inset.query.FindStatus;
import slatepowered.inset.query.Query;

import java.util.concurrent.CompletableFuture;
Expand Down
Loading

0 comments on commit e315eaa

Please sign in to comment.