Skip to content

Commit

Permalink
Merge pull request #190 from microsoft/chgeuer/nextsteps
Browse files Browse the repository at this point in the history
Chgeuer/nextsteps
  • Loading branch information
chgeuer authored Sep 25, 2023
2 parents 715a05b + 3258fb0 commit dcae19a
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 70 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,4 @@ MigrationBackup/
launchSettings.json
.idea
/testcaptures
/captures
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ Configure the appropriate endpoints for EventHub and Storage via environment var
- Set `AZURE_METERING_INFRA_CAPTURE_FILENAME_FORMAT` to the proper format of the blobs in the capture container, something like `{Namespace}/{EventHub}/p{PartitionId}--{Year}-{Month}-{Day}--{Hour}-{Minute}-{Second}`, namely the value from the EventHub'r ARM configuration, `archiveDescription.destination.properties.archiveNameFormat`.
Check the [documentation][eventhub-capture-format] for details

### Snapshot frequency

The aggregation component regularly creates snapshots of the latest state, i.e. for each Event Hub partition, it creates a corresponding JSON file representing the state. The environment variables `AZURE_METERING_MAX_DURATION_BETWEEN_SNAPSHOTS` and `AZURE_METERING_MAX_NUMBER_OF_EVENTS_BETWEEN_SNAPSHOTS` can be used to specify how often a snapshot is created (which ever comes first): For example, `AZURE_METERING_MAX_DURATION_BETWEEN_SNAPSHOTS="00:05:00"` and `AZURE_METERING_MAX_NUMBER_OF_EVENTS_BETWEEN_SNAPSHOTS="2000"` ensure that at least every 5 minutes or every 2000 processed events, a new snapshot is created.

### Local dev setup

For local development, set a few environment variables. On Windows, you can set the local user's environment variables with this script:
Expand Down
11 changes: 11 additions & 0 deletions WHATS_NEW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# What's new?

## Version 1.1

- **2023-09-18**: Added a configuration option for controlling the frequency of state snapshot creation in blob storage. The environment variables `AZURE_METERING_MAX_DURATION_BETWEEN_SNAPSHOTS` and `AZURE_METERING_MAX_NUMBER_OF_EVENTS_BETWEEN_SNAPSHOTS` can be used to specify how often a snapshot is created (which ever comes first): For example, `AZURE_METERING_MAX_DURATION_BETWEEN_SNAPSHOTS="00:05:00"` and `AZURE_METERING_MAX_NUMBER_OF_EVENTS_BETWEEN_SNAPSHOTS="2000"` ensure that at least every 5 minutes or every 2000 processed events, a new snapshot is created.
- **2023-09-14**: Added a diagnostics tool (`src/Tools/ReprocessLocalEventHubCaptureFiles`) which can be used to replay and analyze locally downloaded Event Hub Capture files.
- **2023-09-14:** Added an option to `RemoveUnprocessedMessages` which purges *all* unprocessed messages from a partition's state file: `{ "type": "RemoveUnprocessedMessages", "value": { "partitionId": "5", "all": "all" } }`
- **2023-09-14:** Added support for 2-year and 3-year SaaS offers, i.e., the `RenewalInterval` can not only take the values `Monthly` and `Annually`, but also `2-years` and `3-years`.
- **2023-08-31:** Added business logic unit test completely based on JSON files. For example, [src/Metering.Tests/data/BusinessLogic/RefreshIncludedQuantities](https://github.com/microsoft/metered-billing-accelerator/tree/main/src/Metering.Tests/data/BusinessLogic/RefreshIncludedQuantities) contains a full series of events and the state files resulting from applying each event. The event's JSON filename must contain the EventHubs timestamp. All files must contain the sequence number. Check the [README.md in the `src/Metering.Tests/data/BusinessLogic` folder](https://github.com/microsoft/metered-billing-accelerator/tree/main/src/Metering.Tests/data/BusinessLogic)


2 changes: 2 additions & 0 deletions repo.url
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[InternetShortcut]
URL=https://github.com/microsoft/metered-billing-accelerator
10 changes: 10 additions & 0 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="Current" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<!-- https://github.com/microsoft/DockerTools/issues/209#issuecomment-588232318 -->
<ItemGroup Condition="'$(MSBuildProjectExtension)' != '.dcproj'">
<PackageReference Include="Nerdbank.GitVersioning" Condition="!Exists('packages.config')">
<PrivateAssets>all</PrivateAssets>
<Version>3.6.133</Version>
</PackageReference>
</ItemGroup>
</Project>
35 changes: 0 additions & 35 deletions src/Metering.BaseTypes/MeterCollectionLogic.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ module MeterCollectionLogic =
let private listMapIf (predicate: 'T -> bool) (mapping: 'T -> 'T) : ('T list -> 'T list) =
List.map (fun t -> if predicate t then mapping t else t)


let private listMapIf2 (finalizer: 'T -> 'T) (mappingNonMatch: 'T -> 'T) (predicate: 'T -> bool) (mappingMatch: 'T -> 'T) (l: 'T list) : ('T list * bool) =
// Iterate over the collection.
// If the predicate matches an entry, apply the mappingMatch function (and later indicate we found a matching entry)
Expand All @@ -102,40 +101,6 @@ module MeterCollectionLogic =

result, found

//let private listMapMetersIf (messagePosition: MessagePosition) predicate mappingMatch =
// let mappingMatch =
// mappingMatch
// >> Meter.closeHour messagePosition.PartitionTimestamp
// let mappingNonMatch =
// Meter.closePreviousHourIfNeeded messagePosition.PartitionTimestamp
// >> Meter.resetCountersIfNewBillingCycleStarted messagePosition
// let finalizer =
// Meter.setLastProcessedMessage messagePosition

// listMapIf2
// finalizer
// mappingNonMatch
// predicate
// mappingMatch

let private experiment
(marketplaceResourceId: MarketplaceResourceId)
(messagePosition: MessagePosition)
(updateExistingMeter: MessagePosition -> Meter -> Meter)
(state: MeterCollection)
: MeterCollection =

let matchingTheMeter = Meter.matches marketplaceResourceId
let updateMeter = updateExistingMeter messagePosition
let setLastProcessedMessage = Meter.setLastProcessedMessage messagePosition
let update = updateMeter >> setLastProcessedMessage

let updatedMeters = state.Meters |> listMapIf matchingTheMeter update

{ state with
Meters = updatedMeters
LastUpdate = Some messagePosition }

let private handleSubscriptionPurchased (subscriptionCreationInformation: SubscriptionCreationInformation) (messagePosition: MessagePosition) (state: MeterCollection) : MeterCollection =
let matches = Meter.matches subscriptionCreationInformation.Subscription.MarketplaceResourceId
if state.Meters |> List.exists matches
Expand Down
51 changes: 51 additions & 0 deletions src/Metering.Runtime/MeteringConnections.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
namespace Metering.Integration

open System
open System.Runtime.CompilerServices
open System.Runtime.InteropServices
open Microsoft.Extensions.Configuration
open NodaTime
open Azure.Core
open Azure.Storage.Blobs
open Azure.Messaging.EventHubs
Expand All @@ -23,9 +26,41 @@ type EventHubConfig =
CaptureStorage: CaptureStorage option
InfraStructureCredentials: TokenCredential }

type SnapshotIntervalConfiguration =
{ MaximumDurationBetweenSnapshots: Duration option
MaximumNumberOfEventsBetweenSnapshots: int64 option }

[<Extension>]
module SnapshotIntervalConfiguration =
let shouldCreateSnapshot (lastSnapshot: MessagePosition) (currentPosition: MessagePosition) ({ MaximumDurationBetweenSnapshots = maxDuration; MaximumNumberOfEventsBetweenSnapshots = maxEvents }) : bool =
if lastSnapshot.PartitionID <> currentPosition.PartitionID then
failwith (sprintf "Cannot compare positions from different partitions: lastSnapshot: %A currentPosition: %A" lastSnapshot currentPosition)

let (defaultDuration, defaultEvents) = (Duration.FromHours(1.0), 1000L)

let shouldSnapshotByDuration =
let diff = currentPosition.PartitionTimestamp - lastSnapshot.PartitionTimestamp
let maxDuration = maxDuration |> Option.defaultValue defaultDuration

diff > maxDuration

let shouldSnapshotByCount =
let diff = currentPosition.SequenceNumber - lastSnapshot.SequenceNumber
let maxEvents = maxEvents |> Option.defaultValue defaultEvents

diff > maxEvents

shouldSnapshotByDuration || shouldSnapshotByCount

[<Extension>]
let ShouldCreateSnapshot (config: SnapshotIntervalConfiguration) (lastSnapshot: MessagePosition) (currentPosition: MessagePosition) : bool =
config |> shouldCreateSnapshot lastSnapshot currentPosition


type MeteringConnections =
{ MeteringAPICredentials: MeteringAPICredentials
SnapshotStorage: BlobContainerClient
SnapshotIntervalConfiguration: SnapshotIntervalConfiguration
EventHubConfig: EventHubConfig }

static member private environmentVariablePrefix = "AZURE_METERING_"
Expand Down Expand Up @@ -63,6 +98,20 @@ type MeteringConnections =
| (None, None, None) -> ManagedIdentity
| _ -> failwith $"The {nameof(MeteringAPICredentials)} configuration is incomplete."

static member loadSnapshotIntervalConfigurationFromEnvironment (get: (string -> string option)) : SnapshotIntervalConfiguration =
let maxDurationBetweenSnapshots =
match "MAX_DURATION_BETWEEN_SNAPSHOTS" |> get with
| Some s -> s |> TimeSpan.Parse |> Duration.FromTimeSpan |> Some
| None -> None

let maxNumberOfEventsBetweenSnapshots =
match "MAX_NUMBER_OF_EVENTS_BETWEEN_SNAPSHOTS" |> get with
| Some s -> s |> Int64.Parse |> Some
| None -> None

{ MaximumDurationBetweenSnapshots = maxDurationBetweenSnapshots
MaximumNumberOfEventsBetweenSnapshots = maxNumberOfEventsBetweenSnapshots }

static member private getFromConfig (get: (string -> string option)) (consumerGroupName: string) =
let containerClientWith (cred: TokenCredential) uri = new BlobContainerClient(blobContainerUri = new Uri(uri), credential = cred)

Expand All @@ -79,6 +128,7 @@ type MeteringConnections =

{ MeteringAPICredentials = MeteringConnections.getMeteringApiCredential get
SnapshotStorage = "INFRA_SNAPSHOTS_CONTAINER" |> MeteringConnections.getRequired get |> containerClientWith infraCred
SnapshotIntervalConfiguration = MeteringConnections.loadSnapshotIntervalConfigurationFromEnvironment get
EventHubConfig =
{ CheckpointStorage = "INFRA_CHECKPOINTS_CONTAINER" |> MeteringConnections.getRequired get |> containerClientWith infraCred
CaptureStorage = captureStorage
Expand Down Expand Up @@ -140,3 +190,4 @@ type MeteringConnections =
clientOptions = new EventHubProducerClientOptions(
ConnectionOptions = new EventHubConnectionOptions(
TransportType = EventHubsTransportType.AmqpTcp)))

19 changes: 16 additions & 3 deletions src/Metering.RuntimeCS/AggregationWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public AggregationWorker(ILogger logger, MeteringConfigurationProvider mcp)

public async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Worker starting (AssemblyFileVersion {AssemblyFileVersion}, GitCommitId {GitCommitId})",
ThisAssembly.AssemblyFileVersion,
ThisAssembly.GitCommitId);

List<IDisposable> subscriptions = new();

// pretty-print which partitions we already 'own'
Expand Down Expand Up @@ -169,12 +173,21 @@ private IDisposable SubscribeEmitter(IObservable<MeterCollection> events, Cancel
);
}

private readonly ConcurrentDictionary<PartitionID, MessagePosition> lastSnapshots = new();
private void RegularlyCreateSnapshots(PartitionID partitionId, MeterCollection meterCollection, Func<string> prefix)
{
//if (meterCollection.getLastSequenceNumber() % 500 == 0)
if (meterCollection.LastUpdate != null && meterCollection.LastUpdate.IsSome())
{
MeterCollectionStore.storeLastState(config.MeteringConnections, meterCollection: meterCollection).Wait();
_logger.LogInformation($"{prefix()} Saved state {partitionId.value}#{meterCollection.getLastSequenceNumber()}");
MessagePosition currentPosition = meterCollection.LastUpdate.Value;
MessagePosition lastSnapshot = lastSnapshots.GetOrAdd(partitionId, currentPosition);
bool justStarted = lastSnapshot == currentPosition;
bool shouldCreateSnapshot = config.MeteringConnections.SnapshotIntervalConfiguration.ShouldCreateSnapshot(lastSnapshot, currentPosition);

if (!justStarted && shouldCreateSnapshot)
{
MeterCollectionStore.storeLastState(config.MeteringConnections, meterCollection: meterCollection).Wait();
_logger.LogInformation($"{prefix()} Saved state {partitionId.value}#{meterCollection.getLastSequenceNumber()}");
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Metering.Tests/Metering.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
<None Include="data\Capture\p9--2022-12-09--16-50-12.avro"><CopyToOutputDirectory>Always</CopyToOutputDirectory></None>
<Compile Include="BusinessLogicTests.fs" />
<Compile Include="BillingUnitTest.fs" />
<Compile Include="RuntimeUnitTest.fs" />
<Compile Include="PartitionIdTests.fs" />
<Compile Include="WaterfallUnitTests.fs" />
<Compile Include="SerializationUnitTest.fs" />
Expand Down
60 changes: 60 additions & 0 deletions src/Metering.Tests/RuntimeUnitTest.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

module Metering.NUnitTests.RuntimeUnitTest

open Metering.BaseTypes
open Metering.Integration
open Metering.BaseTypes.EventHub
open NUnit.Framework

[<Test>]
let ``checkGenerateSnapshot``() =
let get (dur: string option, evt: string option) =
function
| "MAX_DURATION_BETWEEN_SNAPSHOTS" -> dur
| "MAX_NUMBER_OF_EVENTS_BETWEEN_SNAPSHOTS" -> evt
| _ -> None

let cfg dur evt = MeteringConnections.loadSnapshotIntervalConfigurationFromEnvironment (get (dur, evt))

let time sequenceNr dateTimeStr = MessagePosition.create "0" sequenceNr (MeteringDateTime.fromStr dateTimeStr)

Assert.IsTrue(
cfg (Some "00:05:00") (Some "2000")
|> SnapshotIntervalConfiguration.shouldCreateSnapshot
(time 0l "2024-01-01T00:00:00Z")
(time 2001l "2024-01-01T00:00:01Z")
)

Assert.IsFalse(
cfg (Some "00:05:00") (Some "2000")
|> SnapshotIntervalConfiguration.shouldCreateSnapshot
(time 0l "2024-01-01T00:00:00Z")
(time 1999l "2024-01-01T00:00:01Z")
)

Assert.IsFalse(
cfg (Some "1.00:00:00") None
|> SnapshotIntervalConfiguration.shouldCreateSnapshot
(time 0l "2024-01-01T00:00:00Z")
(time 999l "2024-01-01T00:00:01Z"))

Assert.IsFalse(
cfg None (Some "2000")
|> SnapshotIntervalConfiguration.shouldCreateSnapshot
(time 0l "2024-01-01T00:00:00Z")
(time 1999l "2024-01-01T00:00:01Z"))

Assert.IsTrue(
cfg None (Some "2000")
|> SnapshotIntervalConfiguration.shouldCreateSnapshot
(time 0l "2024-01-01T00:00:00Z")
(time 1999l "2024-01-01T01:00:01Z"))

// Without configuration, we should use the default values (at least hourly, or every 1000 events)
Assert.IsTrue(
cfg None None
|> SnapshotIntervalConfiguration.shouldCreateSnapshot
(time 0l "2024-01-01T00:00:00Z")
(time 1001l "2024-01-01T00:00:01Z"))
4 changes: 3 additions & 1 deletion src/Metering.sln
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{BE422FBF-7D05-4571-AAFA-874471D110F2}"
ProjectSection(SolutionItems) = preProject
Directory.Build.props = Directory.Build.props
Dockerfile = Dockerfile
global.json = global.json
..\docs\MarketplaceDataStructures.md = ..\docs\MarketplaceDataStructures.md
..\NOTICE.md = ..\NOTICE.md
..\README.md = ..\README.md
..\docs\state-transistions.md = ..\docs\state-transistions.md
..\version.json = ..\version.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Demos", "Demos", "{63515470-9590-4E66-9C64-9F0D3B86547E}"
Expand Down Expand Up @@ -67,7 +69,7 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "ReplayCaptureForPartition",
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Applications", "Applications", "{0198CFB0-AC6C-4808-BDA1-DB560554A1C0}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "ReprocessLocalEventHubCaptureFiles", "Tools\ReprocessLocalEventHubCaptureFiles\ReprocessLocalEventHubCaptureFiles.fsproj", "{D1634B6F-86D0-4AE3-BFD3-CE0A0A969811}"
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "ReprocessLocalEventHubCaptureFiles", "Tools\ReprocessLocalEventHubCaptureFiles\ReprocessLocalEventHubCaptureFiles.fsproj", "{D1634B6F-86D0-4AE3-BFD3-CE0A0A969811}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
Loading

0 comments on commit dcae19a

Please sign in to comment.