Skip to content

Commit

Permalink
Created IStateHolder and have consumers, producers, and readers imple…
Browse files Browse the repository at this point in the history
…ment that instead of IState
  • Loading branch information
blankensteiner committed Dec 11, 2024
1 parent 067daa7 commit 7aa8d50
Show file tree
Hide file tree
Showing 33 changed files with 363 additions and 400 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- **Breaking**: The consumer, reader, and producer now implements IStateHolder instead of IState

## [3.6.0] - 2024-12-09

### Added
Expand Down
2 changes: 1 addition & 1 deletion src/DotPulsar/Abstractions/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
/// <summary>
/// A consumer abstraction.
/// </summary>
public interface IConsumer : IGetLastMessageIds, ISeek, IState<ConsumerState>, IAsyncDisposable
public interface IConsumer : IGetLastMessageIds, ISeek, IStateHolder<ConsumerState>, IAsyncDisposable
{
/// <summary>
/// Acknowledge the consumption of a single message using the MessageId.
Expand Down
2 changes: 1 addition & 1 deletion src/DotPulsar/Abstractions/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
/// <summary>
/// A producer abstraction.
/// </summary>
public interface IProducer : IState<ProducerState>, IAsyncDisposable
public interface IProducer : IStateHolder<ProducerState>, IAsyncDisposable
{
/// <summary>
/// The producer's service url.
Expand Down
2 changes: 1 addition & 1 deletion src/DotPulsar/Abstractions/IReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
/// <summary>
/// A reader abstraction.
/// </summary>
public interface IReader : IGetLastMessageIds, ISeek, IState<ReaderState>, IAsyncDisposable
public interface IReader : IGetLastMessageIds, ISeek, IStateHolder<ReaderState>, IAsyncDisposable
{
/// <summary>
/// The reader's service url.
Expand Down
26 changes: 26 additions & 0 deletions src/DotPulsar/Abstractions/IStateHolder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Abstractions;

/// <summary>
/// A state holder abstraction.
/// </summary>
public interface IStateHolder<TState> where TState : notnull
{
/// <summary>
/// The state abstraction of the holder.
/// </summary>
IState<TState> State { get; }
}
8 changes: 4 additions & 4 deletions src/DotPulsar/Extensions/ConsumerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static async ValueTask Process<TMessage>(
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedTo(this IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = default)
{
var currentState = await consumer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
var currentState = await consumer.State.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}

Expand All @@ -82,7 +82,7 @@ public static async ValueTask<ConsumerStateChanged> StateChangedTo(this IConsume
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedTo(this IConsumer consumer, ConsumerState state, TimeSpan delay, CancellationToken cancellationToken = default)
{
var currentState = await consumer.OnStateChangeTo(state, delay, cancellationToken).ConfigureAwait(false);
var currentState = await consumer.State.OnStateChangeTo(state, delay, cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}

Expand All @@ -97,7 +97,7 @@ public static async ValueTask<ConsumerStateChanged> StateChangedTo(this IConsume
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = default)
{
var currentState = await consumer.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
var currentState = await consumer.State.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}

Expand All @@ -112,7 +112,7 @@ public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this IConsu
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this IConsumer consumer, ConsumerState state, TimeSpan delay, CancellationToken cancellationToken = default)
{
var currentState = await consumer.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
var currentState = await consumer.State.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}
}
8 changes: 4 additions & 4 deletions src/DotPulsar/Extensions/ProducerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static IMessageBuilder<TMessage> NewMessage<TMessage>(this IProducer<TMes
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedTo(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
{
var currentState = await producer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
var currentState = await producer.State.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}

Expand All @@ -54,7 +54,7 @@ public static async ValueTask<ProducerStateChanged> StateChangedTo(this IProduce
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedTo(this IProducer producer, ProducerState state, TimeSpan delay, CancellationToken cancellationToken = default)
{
var currentState = await producer.OnStateChangeTo(state, delay, cancellationToken).ConfigureAwait(false);
var currentState = await producer.State.OnStateChangeTo(state, delay, cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}

Expand All @@ -69,7 +69,7 @@ public static async ValueTask<ProducerStateChanged> StateChangedTo(this IProduce
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedFrom(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
{
var currentState = await producer.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
var currentState = await producer.State.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}

Expand All @@ -84,7 +84,7 @@ public static async ValueTask<ProducerStateChanged> StateChangedFrom(this IProdu
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedFrom(this IProducer producer, ProducerState state, TimeSpan delay, CancellationToken cancellationToken = default)
{
var currentState = await producer.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
var currentState = await producer.State.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}
}
8 changes: 4 additions & 4 deletions src/DotPulsar/Extensions/ReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static class ReaderExtensions
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedTo(this IReader reader, ReaderState state, CancellationToken cancellationToken = default)
{
var currentState = await reader.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
var currentState = await reader.State.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}

Expand All @@ -47,7 +47,7 @@ public static async ValueTask<ReaderStateChanged> StateChangedTo(this IReader re
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedTo(this IReader reader, ReaderState state, TimeSpan delay, CancellationToken cancellationToken = default)
{
var currentState = await reader.OnStateChangeTo(state, delay, cancellationToken).ConfigureAwait(false);
var currentState = await reader.State.OnStateChangeTo(state, delay, cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}

Expand All @@ -62,7 +62,7 @@ public static async ValueTask<ReaderStateChanged> StateChangedTo(this IReader re
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedFrom(this IReader reader, ReaderState state, CancellationToken cancellationToken = default)
{
var currentState = await reader.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
var currentState = await reader.State.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}

Expand All @@ -77,7 +77,7 @@ public static async ValueTask<ReaderStateChanged> StateChangedFrom(this IReader
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedFrom(this IReader reader, ReaderState state, TimeSpan delay, CancellationToken cancellationToken = default)
{
var currentState = await reader.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
var currentState = await reader.State.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}
}
108 changes: 0 additions & 108 deletions src/DotPulsar/Extensions/StateExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,112 +100,4 @@ public static async ValueTask<TState> OnStateChangeFrom<TState>(
}
}
}

/// <summary>
/// Will invoke the onStateLeft callback when the state if left (with delay) and onStateReached when it's reached again.
/// </summary>
/// <returns>
/// ValueTask that will run as long as a final state is not entered.
/// </returns>
public static async ValueTask DelayedStateMonitor<TEntity, TState, TFaultContext>(
this TEntity stateImplementer,
TState state,
TimeSpan delay,
Func<TEntity, TState, CancellationToken, ValueTask<TFaultContext>> onStateLeft,
Func<TEntity, TState, TFaultContext, CancellationToken, ValueTask> onStateReached,
CancellationToken cancellationToken = default) where TEntity : IState<TState> where TState : notnull where TFaultContext : class
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

var currentState = await stateImplementer.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);

TFaultContext? faultContext = null;

try
{
faultContext = await onStateLeft(stateImplementer, currentState, cancellationToken).ConfigureAwait(false);
}
catch
{
// Ignore
}

if (stateImplementer.IsFinalState(currentState))
return;

currentState = await stateImplementer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);

if (stateImplementer.IsFinalState(currentState))
return;

try
{
if (faultContext is not null)
await onStateReached(stateImplementer, currentState, faultContext, cancellationToken).ConfigureAwait(false);
}
catch
{
// Ignore
}
}
}

/// <summary>
/// Will invoke the onStateLeft callback when the state if left (with delay) and onStateReached when it's reached again.
/// </summary>
/// <returns>
/// ValueTask that will run as long as a final state is not entered.
/// </returns>
public static async ValueTask DelayedStateMonitor<TEntity, TState>(
this TEntity stateImplementer,
TState state,
TimeSpan delay,
Func<TEntity, TState, CancellationToken, ValueTask> onStateLeft,
Func<TEntity, TState, CancellationToken, ValueTask> onStateReached,
CancellationToken cancellationToken = default) where TEntity : IState<TState> where TState : notnull
{
async ValueTask<string> onStateLeftFunction(TEntity entity, TState state, CancellationToken cancellationToken)
{
await onStateLeft(entity, state, cancellationToken).ConfigureAwait(false);
return string.Empty;
}

async ValueTask onStateReachedFunction(TEntity entity, TState state, string faultContext, CancellationToken cancellationToken)
{
await onStateReached(entity, state, cancellationToken).ConfigureAwait(false);
}

await stateImplementer.DelayedStateMonitor(state, delay, onStateLeftFunction, onStateReachedFunction, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Will invoke the onStateLeft callback when the state if left (with delay) and onStateReached when it's reached again.
/// </summary>
/// <returns>
/// ValueTask that will run as long as a final state is not entered.
/// </returns>
public static async ValueTask DelayedStateMonitor<TEntity, TState>(
this TEntity stateImplementer,
TState state,
TimeSpan delay,
Action<TEntity, TState> onStateLeft,
Action<TEntity, TState> onStateReached,
CancellationToken cancellationToken = default) where TEntity : IState<TState> where TState : notnull
{
ValueTask<string> onStateLeftFunction(TEntity entity, TState state, CancellationToken cancellationToken)
{
onStateLeft(entity, state);
return new ValueTask<string>(string.Empty);
}

ValueTask onStateReachedFunction(TEntity entity, TState state, string faultContext, CancellationToken cancellationToken)
{
onStateReached(entity, state);
return new ValueTask();
}

await stateImplementer.DelayedStateMonitor(state, delay, onStateLeftFunction, onStateReachedFunction, cancellationToken).ConfigureAwait(false);
}
}
Loading

0 comments on commit 7aa8d50

Please sign in to comment.