Skip to content

Commit

Permalink
renamed FoundItem, qol, delete items
Browse files Browse the repository at this point in the history
  • Loading branch information
orbyfied committed Dec 10, 2023
1 parent 312f5ba commit 0c73c60
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public void remove(DataItem<K, T> item) {
cache.invalidate(item.key());
}

@Override
public void remove(K key) {
cache.invalidate(key);
}

@Override
public int size() {
return cache.asMap().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ public interface DataCache<K, T> extends Iterable<DataItem<K, T>> {
*/
void remove(DataItem<K, T> item);

/**
* Remove the given key from this cache.
*
* @param key The key.
*/
void remove(K key);

/**
* Get the count of items in this cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public void remove(DataItem<K, T> item) {
list.remove(item);
}

@Override
public void remove(K key) {
map.remove(key);
list.removeIf(item -> item.key().equals(key));
}

@Override
public Iterator<DataItem<K, T>> iterator() {
return list.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import slatepowered.inset.codec.*;
import slatepowered.inset.internal.ProjectionInterface;
import slatepowered.inset.operation.Sorting;
import slatepowered.inset.query.FindAllOperation;
import slatepowered.inset.query.FoundItem;
import slatepowered.inset.query.FindOperation;
import slatepowered.inset.query.FindResult;
import slatepowered.inset.query.PartialItem;
import slatepowered.inset.query.Query;
import slatepowered.inset.source.DataSourceFindResult;
import slatepowered.inset.source.DataTable;

import java.lang.reflect.Type;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand All @@ -21,7 +21,7 @@
* @param <K> The primary key type.
* @param <T> The data type.
*/
public class DataItem<K, T> extends FoundItem<K, T> {
public class DataItem<K, T> extends PartialItem<K, T> {

public DataItem(Datastore<K, T> datastore, K key) {
this.datastore = datastore;
Expand Down Expand Up @@ -149,11 +149,18 @@ public long lastReferenceTime() {
*
* @return This.
*/
@Override
public DataItem<K, T> dispose() {
datastore.getDataCache().remove(this);
return this;
}

@Override
public DataItem<K, T> delete() {
super.delete();
return this;
}

/**
* Create a new default value for this item if the value is absent.
*
Expand Down Expand Up @@ -273,8 +280,8 @@ public <V> V getField(String fieldName, Type expectedType) {
}

@Override
public DataItem<K, T> fetch() {
return this;
public FindOperation<K, T> find() {
return new FindOperation<>(datastore, null).completeSuccessfully(FindResult.CACHED, this);
}

@Override
Expand All @@ -293,7 +300,7 @@ protected <V> V projectInterface(ProjectionInterface projectionInterface) {
*
* @return The future.
*/
public CompletableFuture<DataItem<K, T>> fetchAsync() {
public CompletableFuture<DataItem<K, T>> findAsync() {
return datastore.getSourceTable()
.findOneAsync(Query.byKey(key))
.thenApply(result -> this.decode(result.input()).fetchedNow());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import slatepowered.inset.operation.FieldOrderSorting;
import slatepowered.inset.operation.FieldOrdering;
import slatepowered.inset.operation.Sorting;
import slatepowered.inset.query.FoundItem;
import slatepowered.inset.query.PartialItem;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Stream;
Expand All @@ -25,8 +24,8 @@ public final class CachedStreams {
* @param <T> The value type.
* @return The stream.
*/
public static <K, T> Stream<? extends FoundItem<K, T>> zipStreamsDistinct(Stream<? extends FoundItem<K, T>> priorityStream,
Stream<? extends FoundItem<K, T>> addedStream) {
public static <K, T> Stream<? extends PartialItem<K, T>> zipStreamsDistinct(Stream<? extends PartialItem<K, T>> priorityStream,
Stream<? extends PartialItem<K, T>> addedStream) {
if (priorityStream == null) return addedStream;
if (addedStream == null) return priorityStream;
return Stream.concat(priorityStream, addedStream).distinct();
Expand All @@ -42,10 +41,10 @@ public static <K, T> Stream<? extends FoundItem<K, T>> zipStreamsDistinct(Stream
* @param <K> The key type.
* @param <T> The value type.
*/
public static <K, T> Stream<? extends FoundItem<K, T>> sortStream(Datastore<K, T> datastore,
Stream<? extends FoundItem<K, T>> stream,
Sorting sorting) {
Comparator<FoundItem<K, T>> fastComparator = createFastComparator(datastore, sorting);
public static <K, T> Stream<? extends PartialItem<K, T>> sortStream(Datastore<K, T> datastore,
Stream<? extends PartialItem<K, T>> stream,
Sorting sorting) {
Comparator<PartialItem<K, T>> fastComparator = createFastComparator(datastore, sorting);
return stream.sorted(fastComparator);
}

Expand All @@ -59,8 +58,8 @@ public static <K, T> Stream<? extends FoundItem<K, T>> sortStream(Datastore<K, T
* @param <T> The value type.
* @return The comparator.
*/
public static <K, T> Comparator<FoundItem<K, T>> createFastComparator(final Datastore<K, T> datastore,
final Sorting sorting) {
public static <K, T> Comparator<PartialItem<K, T>> createFastComparator(final Datastore<K, T> datastore,
final Sorting sorting) {
if (!(sorting instanceof FieldOrderSorting)) {
throw new UnsupportedOperationException("Unsupported sorting type for fast comparator: " + sorting.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import slatepowered.inset.codec.DataCodec;
import slatepowered.inset.datastore.DataItem;
import slatepowered.inset.datastore.Datastore;
import slatepowered.inset.datastore.OperationStatus;
Expand All @@ -14,7 +12,6 @@
import slatepowered.inset.operation.Sorting;
import slatepowered.inset.source.DataSourceBulkIterable;
import slatepowered.inset.source.SourceFoundItem;
import sun.security.util.Cache;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -63,10 +60,10 @@ public static class Options {
/**
* The stream of items.
*/
protected Stream<? extends FoundItem<K, T>> stream;
protected Stream<? extends PartialItem<K, T>> stream;

// The cached stream iterator
protected Iterator<? extends FoundItem<K, T>> streamIterator;
protected Iterator<? extends PartialItem<K, T>> streamIterator;

/**
* The options passed on this operation.
Expand All @@ -80,13 +77,13 @@ public FindAllOperation(Datastore<K, T> datastore, Query query, Options options)
}

// update the current stream instance to the given instance
private synchronized void updateStream(Stream<? extends FoundItem<K, T>> stream) {
private synchronized void updateStream(Stream<? extends PartialItem<K, T>> stream) {
this.stream = stream;
}

// ensure the presence of a usable stream iterator to apply
// to the current output stream, this is a terminal operation
private Iterator<? extends FoundItem<K, T>> streamIterator() {
private Iterator<? extends PartialItem<K, T>> streamIterator() {
if (streamIterator == null) {
this.streamIterator = stream.iterator();
}
Expand Down Expand Up @@ -280,7 +277,7 @@ private SourceFoundItem<K, T> qualify(SourceFoundItem<?, ?> item) {
*
* @return The item.
*/
public Optional<? extends FoundItem<K, T>> next() {
public Optional<? extends PartialItem<K, T>> next() {
return cachedStream != null ?
streamIterator().hasNext() ? Optional.of(streamIterator().next()) : Optional.empty() :
iterable.next().map(this::qualify);
Expand All @@ -296,7 +293,7 @@ public Optional<? extends FoundItem<K, T>> next() {
*
* @return The item.
*/
public CompletableFuture<Optional<? extends FoundItem<K, T>>> nextAsync() {
public CompletableFuture<Optional<? extends PartialItem<K, T>>> nextAsync() {
return async(this::next);
}

Expand All @@ -310,7 +307,7 @@ public CompletableFuture<Optional<? extends FoundItem<K, T>>> nextAsync() {
*
* @return The item.
*/
public Optional<? extends FoundItem<K, T>> first() {
public Optional<? extends PartialItem<K, T>> first() {
return cachedStream != null ?
stream.findFirst() :
iterable.first().map(this::qualify);
Expand All @@ -325,7 +322,7 @@ public Optional<? extends FoundItem<K, T>> first() {
*
* @return The item.
*/
public CompletableFuture<Optional<? extends FoundItem<K, T>>> firstAsync() {
public CompletableFuture<Optional<? extends PartialItem<K, T>>> firstAsync() {
return async(this::first);
}

Expand All @@ -340,7 +337,7 @@ public CompletableFuture<Optional<? extends FoundItem<K, T>>> firstAsync() {
* @return The list of items.
*/
@SuppressWarnings("unchecked")
public List<? extends FoundItem<K, T>> list() {
public List<? extends PartialItem<K, T>> list() {
if (cachedStream != null) {
return stream.collect(Collectors.toList());
}
Expand All @@ -362,7 +359,7 @@ public List<? extends FoundItem<K, T>> list() {
*
* @return The list of items.
*/
public CompletableFuture<List<? extends FoundItem<K, T>>> listAsync() {
public CompletableFuture<List<? extends PartialItem<K, T>>> listAsync() {
return async(this::list);
}

Expand All @@ -374,7 +371,7 @@ public CompletableFuture<List<? extends FoundItem<K, T>>> listAsync() {
*
* @return The stream of items.
*/
public Stream<? extends FoundItem<K, T>> stream() {
public Stream<? extends PartialItem<K, T>> stream() {
return stream;
}

Expand All @@ -385,7 +382,7 @@ public Stream<? extends FoundItem<K, T>> stream() {
* @param consumer The consumer.
* @return This.
*/
public FindAllOperation<K, T> peek(Consumer<FoundItem<K, T>> consumer) {
public FindAllOperation<K, T> peek(Consumer<PartialItem<K, T>> consumer) {
stream = stream.peek(consumer);
return this;
}
Expand Down Expand Up @@ -428,7 +425,7 @@ protected synchronized FindAllOperation<K, T> completeInternal(DataSourceBulkIte
this.sourceHadAny = iterable.hasNext();

// update stream with iterable items
Stream<FoundItem<K, T>> iterableStream = iterable.stream().map(this::qualify);
Stream<PartialItem<K, T>> iterableStream = iterable.stream().map(this::qualify);
updateStream(CachedStreams.zipStreamsDistinct(this.stream, iterableStream));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ public FindOperation(Datastore<K, T> datastore, Query query) {
super(datastore, query);
}

/**
* Await completion of this operation, blocking this thread,
* then get the data item if present, otherwise return null.
*
* @return The data item if present.
*/
public DataItem<K, T> awaitItem() {
return await().item();
}

/**
* Await completion of this operation, blocking this thread,
* then get the data item if present, otherwise return an empty optional.
*
* @return The data item in an optional if present.
*/
public Optional<DataItem<K, T>> awaitOptional() {
return await().optional();
}

/**
* When this query is successfully completed, call the given consumer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
import java.util.concurrent.CompletableFuture;

/**
* Represents a found item in a {@link FindAllOperation}.
* Represents a potentially partial item gotten as a result of
* either a cache hit or as a returned item from a data source operation.
*
* @param <K> The key type.
* @param <T> The value type.
*/
public abstract class FoundItem<K, T> {
public abstract class PartialItem<K, T> {

protected DecodeInput cachedInput; // The cached input, used by this class to read partial data
private double[] cachedOrderCoefficients; // The cached order coefficient array
Expand Down Expand Up @@ -96,21 +97,22 @@ public K getKey() {
public abstract <V> V getField(String fieldName, Type expectedType);

/**
* Fetch a data item from the database if this result was partial,
* Find or fetch a data item from the database if this result was partial,
* otherwise ensure it is cached.
*
* @return The data item.
*/
public abstract DataItem<K, T> fetch();
public abstract FindOperation<K, T> find();

/**
* Asynchronously fetch a data item from the database if this result was partial,
* otherwise ensure it is cached.
* If this item is complete, synchronize this data with the cache,
* otherwise fetch the complete data from the database and update it
* in the cache.
*
* @return The data item.
*/
public CompletableFuture<DataItem<K, T>> fetchAsync() {
return CompletableFuture.supplyAsync(this::fetch, assertQualified().getDataManager().getExecutorService());
public FindOperation<K, T> fetch() {
return find().thenFetchIfCached();
}

/**
Expand Down Expand Up @@ -168,6 +170,28 @@ public <V> V project(Class<V> vClass) {
*/
protected abstract <V> V projectInterface(ProjectionInterface projectionInterface);

/**
* Drop this item from the cache if cached.
*
* @return This.
*/
public PartialItem<K, T> dispose() {
assertQualified().getDataCache().remove(getKey());
return this;
}

/**
* Delete this item from the database and drop it from the cache.
*
* @return This.
*/
public PartialItem<K, T> delete() {
Datastore<K, T> datastore = assertQualified();
datastore.getSourceTable().deleteOne(Query.byKey(getKey()));
datastore.getDataCache().remove(getKey());
return this;
}

// project the partial data into the given class
protected <V> V projectDataClass(DataCodec<K, V> codec) {
Datastore<K, T> datastore = assertQualified();
Expand All @@ -178,8 +202,8 @@ protected <V> V projectDataClass(DataCodec<K, V> codec) {
@Override
public boolean equals(Object other) {
if (other == this) return true;
if (!(other instanceof FoundItem)) return false;
FoundItem<?, ?> otherItem = (FoundItem<?, ?>) other;
if (!(other instanceof PartialItem)) return false;
PartialItem<?, ?> otherItem = (PartialItem<?, ?>) other;

// compare keys
return getKey().equals(otherItem.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import slatepowered.inset.operation.Projection;
import slatepowered.inset.operation.Sorting;
import slatepowered.inset.query.FoundItem;
import slatepowered.inset.query.Query;

import java.util.List;
Expand Down
Loading

0 comments on commit 0c73c60

Please sign in to comment.