Skip to content

Commit

Permalink
Running and trailing average of BigDecimal (#7)
Browse files Browse the repository at this point in the history
+ Add general WithOriginal object and Gatherer to carry an original value and a mapped value (an average, in this case).
+ Add function to GathererUtils to throw an IllegalArgumentException when a parameter is null, rather than use Objects.requireNotNull and get a NullPointerException.
+ Averaging of BigDecimal with ability to specify a positive number of trailing values to consider, whether to emit partially calculated values for a trailing average, and the ability to change the RoudingMode and MathContext for mathematical operations.
  • Loading branch information
tginsberg authored Jul 16, 2024
1 parent 8356551 commit 12eec9c
Show file tree
Hide file tree
Showing 8 changed files with 672 additions and 2 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,28 @@ TBD, once I start publishing snapshots to Maven Central.

(Example, TODO clean this up)

**Running Average**

```java
Stream
.of(new BigDecimal("1.0"), new BigDecimal("2.0"), new BigDecimal("10.0"))
.gather(Gatherers4j.averageBigDecimals())
.toList();

// [1, 1.5, 4.3333333333333333]
```

**Trailing Average**

```java
Stream
.of(new BigDecimal("1.0"), new BigDecimal("2.0"), new BigDecimal("10.0"), new BigDecimal("20.0"), new BigDecimal("30.0"))
.gather(Gatherers4j.averageBigDecimals().trailing(2))
.toList();

// [1.5, 6, 15, 25]
```


**Removing consecutive duplicate elements:**

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2024 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Gatherer;

import static com.ginsberg.gatherers4j.GathererUtils.mustNotBeNull;

public class AveragingBigDecimalGatherer<INPUT>
implements Gatherer<INPUT, AveragingBigDecimalGatherer.State, BigDecimal> {

private final Function<INPUT, BigDecimal> mappingFunction;
private RoundingMode roundingMode = RoundingMode.HALF_UP;
private MathContext mathContext = MathContext.DECIMAL64;
private BigDecimal nullReplacement;
private int trailingCount = 1;
private boolean includePartialValues;

AveragingBigDecimalGatherer(final Function<INPUT, BigDecimal> mappingFunction) {
super();
this.mappingFunction = mappingFunction;
}

@Override
public Supplier<State> initializer() {
return trailingCount == 1 ? State::new : () -> new TrailingState(trailingCount);
}

@Override
public Integrator<AveragingBigDecimalGatherer.State, INPUT, BigDecimal> integrator() {
return (state, element, downstream) -> {
final BigDecimal mappedElement = element == null ? nullReplacement : mappingFunction.apply(element);
if (mappedElement != null) {
state.add(mappedElement, mathContext);
if (state.canCalculate(includePartialValues)) {
return downstream.push(state.average(roundingMode, mathContext.getPrecision()));
}
}
return !downstream.isRejecting();
};
}

public AveragingBigDecimalGatherer<INPUT> trailing(int count) {
if (count <= 0) {
throw new IllegalArgumentException("Trailing count must be positive");
}
trailingCount = count;
return this;
}

public AveragingBigDecimalGatherer<INPUT> includePartialTailingValues() {
includePartialValues = true;
return this;
}

public AveragingBigDecimalGatherer<INPUT> treatNullAsZero() {
return treatNullAs(BigDecimal.ZERO);
}

public AveragingBigDecimalGatherer<INPUT> treatNullAs(final BigDecimal rule) {
this.nullReplacement = rule;
return this;
}

public AveragingBigDecimalGatherer<INPUT> withMathContext(final MathContext mathContext) {
mustNotBeNull(mathContext, "MathContext must not be null");
this.mathContext = mathContext;
return this;
}

public AveragingBigDecimalGatherer<INPUT> withRoundingMode(final RoundingMode roundingMode) {
mustNotBeNull(roundingMode, "RoundingMode must not be null");
this.roundingMode = roundingMode;
return this;
}

public WithOriginalGatherer<INPUT, State, BigDecimal> withOriginal() {
return new WithOriginalGatherer<>(this);
}

public static class State {
long count;
BigDecimal sum = BigDecimal.ZERO;

void add(final BigDecimal element, final MathContext mathContext) {
count++;
sum = sum.add(element, mathContext);
}

boolean canCalculate(final boolean allowPartial) {
return true;
}

BigDecimal average(final RoundingMode roundingMode, int precision) {
if (sum.equals(BigDecimal.ZERO)) {
return BigDecimal.ZERO;
} else {
return sum.divide(BigDecimal.valueOf(count), precision, roundingMode);
}
}
}

public static class TrailingState extends State {
final BigDecimal[] series;
int index = 0;

private TrailingState(int lookBack) {
this.series = new BigDecimal[lookBack];
Arrays.fill(series, BigDecimal.ZERO);
}

@Override
boolean canCalculate(final boolean allowPartial) {
return allowPartial || count >= series.length;
}

@Override
void add(final BigDecimal element, final MathContext mathContext) {
sum = sum.subtract(series[index]).add(element, mathContext);
series[index] = element;
index = (index + 1) % series.length;
if (count < series.length) {
count++;
}
}
}
}
9 changes: 8 additions & 1 deletion src/main/java/com/ginsberg/gatherers4j/GathererUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@
package com.ginsberg.gatherers4j;

public class GathererUtils {
public static boolean safeEquals(Object left, Object right) {

public static boolean safeEquals(final Object left, final Object right) {
if (left == null && right == null) {
return true;
} else if (left == null || right == null) {
return false;
}
return left.equals(right);
}

public static void mustNotBeNull(final Object subject, final String message) {
if (subject == null) {
throw new IllegalArgumentException(message);
}
}
}
23 changes: 23 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/Gatherers4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ginsberg.gatherers4j;

import java.math.BigDecimal;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
Expand All @@ -24,6 +25,28 @@

public class Gatherers4j {

/**
* Create a Stream that is the running average of <code>Stream&lt;BigDecimal&gt;</code>
*
* @return AveragingBigDecimalGatherer
*/
public static AveragingBigDecimalGatherer<BigDecimal> averageBigDecimals() {
return new AveragingBigDecimalGatherer<>(Function.identity());
}

/**
* Create a Stream that is the running average of <code>BigDecimal</code> objects as mapped by
* the given function. This is useful when paired with the <code>withOriginal</code> function.
*
* @param mappingFunction A non-null function to map the <code>INPUT</code> type to <code>BigDecimal</code>
* @return AveragingBigDecimalGatherer
*/
public static <INPUT> AveragingBigDecimalGatherer<INPUT> averageBigDecimalsBy(
final Function<INPUT, BigDecimal> mappingFunction
) {
return new AveragingBigDecimalGatherer<>(mappingFunction);
}

/**
* <p>Given a stream of objects, filter the objects such that any consecutively appearing
* after the first one are dropped.
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/WithOriginal.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

public record WithOriginal<ORIGINAL, CALCULATED>(ORIGINAL original, CALCULATED calculated) {
}
62 changes: 62 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/WithOriginalGatherer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Supplier;
import java.util.stream.Gatherer;

public class WithOriginalGatherer<INPUT, STATE, OUTPUT>
implements Gatherer<INPUT, STATE, WithOriginal<INPUT, OUTPUT>> {

private final Gatherer<INPUT, STATE, OUTPUT> delegate;

WithOriginalGatherer(final Gatherer<INPUT, STATE, OUTPUT> delegate) {
this.delegate = delegate;
}

@Override
public Supplier<STATE> initializer() {
return delegate.initializer();
}

@Override
public Integrator<STATE, INPUT, WithOriginal<INPUT, OUTPUT>> integrator() {
final CapturingDownstream<OUTPUT> capturingDownstream = new CapturingDownstream<>();
final Integrator<STATE, INPUT, OUTPUT> delegateIntegrator = delegate.integrator();

return (state, element, downstream) -> {
final boolean response = delegateIntegrator.integrate(state, element, capturingDownstream);
while (!capturingDownstream.captured.isEmpty()) {
downstream.push(new WithOriginal<>(element, capturingDownstream.captured.poll()));
}
return response;
};
}

private static class CapturingDownstream<OUTPUT> implements Downstream<OUTPUT> {

private final Deque<OUTPUT> captured = new ConcurrentLinkedDeque<>();

@Override
public boolean push(final OUTPUT capturedElement) {
captured.push(capturedElement);
return true; // Unused
}
}
}
Loading

0 comments on commit 12eec9c

Please sign in to comment.