Skip to content

BackgroundTasks: State pattern

ronimizy edited this page Jan 27, 2024 · 37 revisions

When faced with complex, multi-stage business scenarios, implemented as background tasks, it is easy to end up with hard to comprehend giant code mess. The root cause to it, is not so obvious solution to that decomposition problem.

There are two solutions that come to mind right away: encapsulate each stage into it's own service or create chain of background task (to encapsulate each stage in a separate task).

First solution is possible via Itmo.Dev.Platform.BackgroundTask's execution metadata persistence feature. You can store some data or enum-flags to determine the task's execution stage. Suffice to say, it does not solve decomposition problem nicely, as orchestration code is still has to be present in the task implementation, and it can be bulky and hard to comprehend as well.

Second solution is probably even worse. As it leads to low cohesion between scenario stages. In the longer terms, the process as a whole would be fragmented. The decomposition into separate tasks would make it cognitively harder to interpret it as single process, furthermore individual task semantics could signal that the stages can be run independently (which they probably not), aggravating this issue.

An alternative way is to use a State pattern, and decompose each execution stage into it's own state. This wiki section describes core concepts and implementation details of using State pattern with Itmo.Dev.Platform.BackgroundTasks.

sample code (implementation details is described further in this article)

Idea

Main pattern implementation idea is described by couple of entities: task state, state handler.

Entities

Task state denotes a stage in task execution it's object must be stored in task's execution metadata to persist state changes between stages execution. Task state must store the data, required to execute the stage. If one stage is passing data to another, it has to be done through states.

State handlers are used to implement task execution stages. They receive handled state itself, and some context (usually task id and metadata). As states are stored inside execution metadata, they can be mutable if you need to persist some stages context in event of service being stopped (ex: when loading paginated data, you can store cursor in a state, and update it after each page processing. if application would be stopped, cursor will be restored after application restart).

Moves between the states should be encapsulated in state handlers.

State handler execution can return two kinds of results: Finished, FinishedWithResult. The first one is used to make a state move without returning any task result, so the task would process next state in the same execution. Second kind is used to either suspend task execution (in case you need to wait on some operation callback from outside service), or end task execution (with success, failure or cancellation).

Waiting states

As state move logic is encapsulated in state handlers, you have to ensure that this abstraction does not leak to outside service code.

Let's imagine that your task state machine is suspended, waiting on some outside callback. This callback is result of some operation, that you started in a previous stage, and it has some id. When you receive callback, that this operation is finished, you have to match that operation id with an id of background task you need to proceed.

The naive way to approach this problem is to add a "match table" that will store pairs of operation ids and background task ids. That way, when you receive a callback, you query that table to determine which task you have to proceed.

Validation is when this approach comes falls apart. In an event of failure, you probably do not want to proceed a task in an invalid state, so you need to check, whether task's state comes after the one that triggers on outside operation and suspends the task. The problem in that case, is that you probably moved your task's state machine to the state that comes after outside operation completion, and you must check the task state against it. This leads to discussed leakage, as state sequence encapsulated in state handlers, but you must know it here.

To solve that problem, you should use waiting states. This is a special kind of state, it does not explicitly denotes a stage in your task execution, it is rather used to denote that the task is waiting for some specific outside operation.
Handler that starts this outside operation, must move state to this waiting state with suspended result. While waiting state handler must perform further state move to the next stage of task execution.

When waiting state is added into a task state machine, you should use it to check against when proceeding task.
Implementation details are presented further in the article

Implementation

State machine

Define some type to be used as a base class for your states:

public abstract record TaskState;

Define an interface for state handler:

public record StateHandlerContext(BackgroundTaskId BackgroundTaskId, StateTaskMetadata Metadata);
public abstract record StateHandlerResult
{
    private StateHandlerResult() { }

    public sealed record Finished(TaskState State) : StateHandlerResult;

    public sealed record FinishedWithResult(
        TaskState State,
        BackgroundTaskExecutionResult<EmptyExecutionResult, StateTaskError> Result) : StateHandlerResult;
}
public interface IStateHandler<in TState> where TState : TaskState
{
    ValueTask<StateHandlerResult> HandleAsync(
        TState state,
        StateHandlerContext context,
        CancellationToken cancellationToken);
}

Starting state

To enforce abstraction, when implementing background task itself, you should always have a starting state, it's handler must perform a move operation to the state, corresponding to the first stage of your scenario.

public sealed record StartingState : TaskState;
public class StartingStateHandler : IStateHandler<StartingState>
{
    public ValueTask<StateHandlerResult> HandleAsync(
        StartingState state,
        StateHandlerContext context,
        CancellationToken cancellationToken)
    {
        var result = new StateHandlerResult.FinishedWithResult(
            new WaitingFirstState(Guid.NewGuid()),
            BackgroundTaskExecutionResult.Suspended.ForEmptyResult().ForError<StateTaskError>());

        return ValueTask.FromResult<StateHandlerResult>(result);
    }
}

In this example, starting state immediately puts a task to wait of operation associated with FirstState. For example purposes, an external operation id is randomly generated here. It is stored in a waiting state for this operation:

public sealed record WaitingFirstState(Guid OperationId) : TaskState;

Waiting states

Waiting state was introduced in a previous sample. As discussed before, is used to denote that the task is waiting for some operation completion (in this case, on operation associated with moving to FirstState state).

Let's implement some service code, to proceed task execution. It will run to process some callback of operation finish.

public async Task ProceedFirstStateAsync(Guid operationId, CancellationToken cancellationToken)
{
    var query = BackgroundTaskQuery.Build(builder => builder
        .WithName(StateBackgroundTask.Name)
        .WithState(BackgroundTaskState.Suspended)
        .WithExecutionMetadata(new StateTaskExecutionMetadata { State = new WaitingFirstState(operationId) }));

    await _runner
        .ProceedBackgroundTask
        .WithQuery(query)
        .WithoutExecutionMetadataModification()
        .ProceedAsync(cancellationToken);
}

In the example above, we create a query for background task with the name we need, in a suspended state. Additionally, we add an execution metadata to include only background tasks in WaitingFistState with received operation id.

Keep in mind the difference between BackgroundTask's state and state stored in execution metadata. First is a state of a task as a whole (whether it is completed, suspended, failed or cancelled), and the latter is the current execution state (your state machine).

That way, if there would be no such task, we can handle a ProceedTaskResult.TaskNotFound result returned by ProceedAsync method, and perform some actions accordingly.

When task is proceeded, it is scheduled for execution, and will resume in WaitingFirstState state (for current example), so we have to implement a handler, that will move task's state to the next stage.

public class WaitingFirstStateHandler : IStateHandler<WaitingFirstState>
{
    public ValueTask<StateHandlerResult> HandleAsync(
        WaitingFirstState state,
        StateHandlerContext context,
        CancellationToken cancellationToken)
    {
        var result = new StateHandlerResult.Finished(new FirstState());
        return ValueTask.FromResult<StateHandlerResult>(result);
    }
}

In our case, after WaitingFirstState - comes FirstState. Note that StateHandlerResult.Finished is used here, as the task does not have to wait for anything, it can process FirstState in the current execution.

Background task

When using State pattern, your background task will become an engine, running this state machine. It has to 1) execute handler for current state, 2) process handler result

Handling current state

To handle current state you need to determine which handler is designated for it. Simplest, and probably the best, way to do it, is to match state types with switch expression. You would have to inject IServiceProvider into a background task to create the handlers themselves.

You can register your handler in DI directly, or you can use ActivatorUtilities to create them (as used in further example).

private ValueTask<StateHandlerResult> HandleAsync(
    TaskState taskState,
    StateHandlerContext context,
    CancellationToken cancellationToken)
{
    return taskState switch
    {
        StartingState state => ActivatorUtilities
            .CreateInstance<StartingStateHandler>(_serviceProvider)
            .HandleAsync(state, context, cancellationToken),

        WaitingFirstState state => ActivatorUtilities
            .CreateInstance<WaitingFirstStateHandler>(_serviceProvider)
            .HandleAsync(state, context, cancellationToken),

        FirstState state => ActivatorUtilities
            .CreateInstance<FirstStateHandler>(_serviceProvider)
            .HandleAsync(state, context, cancellationToken),

        CompletedState state => ActivatorUtilities
            .CreateInstance<CompletedStateHandler>(_serviceProvider)
            .HandleAsync(state, context, cancellationToken),

        _ => throw new ArgumentOutOfRangeException(nameof(taskState), taskState, "Could not resolve state handler"),
    };
}

To simplify type inference of handlers and etc, you can execute HandleAsync method directly in switch expression.

Updating state machine

After current state handler execution, you must update task context, to reflect your state machine changes. So your's task ExecuteAsync method would look like so:

public async Task<BackgroundTaskExecutionResult<EmptyExecutionResult, StateTaskError>> ExecuteAsync(
    BackgroundTaskExecutionContext<StateTaskMetadata, StateTaskExecutionMetadata> executionContext,
    CancellationToken cancellationToken)
{
    executionContext.ExecutionMetadata.State ??= new StartingState();

    var context = new StateHandlerContext(executionContext.Id, executionContext.Metadata);

    while (true)
    {
        var result = await HandleAsync(executionContext.ExecutionMetadata.State, context, cancellationToken);

        if (result is StateHandlerResult.Finished finished)
        {
            executionContext.ExecutionMetadata.State = finished.State;
            continue;
        }

        if (result is StateHandlerResult.FinishedWithResult finishedWithResult)
        {
            executionContext.ExecutionMetadata.State = finishedWithResult.State;
            return finishedWithResult.Result;
        }

        return BackgroundTaskExecutionResult
            .Failure
            .ForEmptyResult()
            .WithError(new StateTaskError($"Invalid handler result = {result}"));
    }
}

Note that first line of ExecuteAsync initialises StartingState when needed (would be executed on task's first run)

Note that if your handler context depends on some changing data, which in this example is is not, you may have to move it's creation inside to while loop

In the example above, we process handler result. We always update current state in execution metadata, to the state in result. If the state is FinishedWithResult, we return the BackgroundTaskExecutionResult from it, finishing current task execution.

If retuned handler result does not match either Finished or FinishedWithResult cases - something went really wrong, so we return failure result from the task.

Completed state

You should always have a completed state. The move to this state will be performed by the state handler corresponding to the last stage of your task execution. This handler should return FinishedWithResult with completed state object and finished BackgroundTaskExecutionResult.

To keep state handling unified, you should create a completed state handler, which would return FinishedWithResult with the same completed state and failed BackgroundTaskExecutionResult (additionally you can write an error message telling that the already completed task was invoked).

public class CompletedStateHandler : IStateHandler<CompletedState>
{
    public ValueTask<StateHandlerResult> HandleAsync(
        CompletedState state,
        StateHandlerContext context,
        CancellationToken cancellationToken)
    {
        var error = new StateTaskError("Task is in completed state");

        var result = new StateHandlerResult.FinishedWithResult(
            state,
            BackgroundTaskExecutionResult.Failure.ForEmptyResult().WithError(error));

        return ValueTask.FromResult<StateHandlerResult>(result);
    }
}

Summary

This task implementation will continuously execute state handlers until it receives FinishedWithResult handler result. If it will contain suspended BackgroundTaskExecutionResult, the task would wait for you to somehow proceed it. If received BackgroundTaskExecutionResult would be terminal (ie: completed, failed, cancelled) task will finish.

Described approach helps to distinctly decompose big business process implemented as a background task, while maintaining process logical cohesion.

Concerns

Stage sequence cohesion

As we have encapsulated state moves into state handlers, it can become tricky to determine the order in which our state sequence goes.

We cannot rely on IDE or file system ordering as your states most certainly do not order alphabetically. Workaround with adding number prefixes to state and state handler files is also messy and undesirable (ex: you need to insert a stage in the middle of state sequence, that will lead to all state files refactoring).

Feasible solution would be to use HandleAsync's switch expression as an order source. It is much easier to move and insert code blocks, than renaming bunch of files. It also makes sense to have an entry point to scenario (the background task) as the scenario overview.

This article does not cover complex state machine scenario (ex: with conditional branching). Despite that, that solution is applicable to it, with an exception that you still would have to rely on state handler source code to determine ordering in case of branching. Solution becomes somewhat tricky when state machine cycles occur. But it is quite the niche case and there will be some compression tradeoff you have to make anyway, unless you implement some custom state machine builder (which is way out of scope of this article).

Out of order suspend/proceed

This issue can be seen during waiting state part of task execution. If outside operation has probability of completing really fast, in a way that its callback could be received prior to completion of background task execution, that yielded suspended result. In that case, your code may attempt to proceed a task - "non existing" to formed query, leading to errors.

If this scenario is possible in your use case, you should consider using Transactional Inbox pattern, to store and retry outside operation callback. (Itmo.Dev.Platform.MessagePersistence package, that will implement transactional boxes TBA).

Alternatively, if you do not have this option , you can start another background task to retry callback. Suffice to say, this approach has some performance and resource usage implications and may not suit you. Genuinely, it is not preferred and suggested way to solve this issue, and considered more as a workaround, than a feasible solution.