Skip to content

Commit

Permalink
Swinging door handles the end of the data stream correctly (#72)
Browse files Browse the repository at this point in the history
* Fix xml doc comment for ICompression.ArchiveIncoming

* Tests

* Renamed state field

* Store the previous snapshot too, as it's need for considering the penultimate datapoint

* Handle the archiving of the penultimate point
  • Loading branch information
gfoidl authored Dec 22, 2021
1 parent 31698c9 commit e47ae3d
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 85 deletions.
19 changes: 19 additions & 0 deletions data/swinging-door/trend3.plt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
reset

#set terminal dumb

set grid
set title 'Swinging door compression -- Trend3'
set xlabel 'Time'
set ylabel 'Value'

set xrange [0:10]
set yrange [-3:6]

set style line 2 lc rgb 'green' pt 9 # triangle

#set datafile separator ";"

# replot is also possible for the second plot
plot 'trend3_raw.csv' with linespoints title 'raw', \
'trend3_compressed.csv' with points ls 2 title 'compressed'
7 changes: 7 additions & 0 deletions data/swinging-door/trend3_compressed.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#x y
1 0
4 5
5 -2
6 5
8 3
9 5
19 changes: 19 additions & 0 deletions data/swinging-door/trend3_mini.plt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
reset

#set terminal dumb

set grid
set title 'Swinging door compression -- Trend3 (minimal repro)'
set xlabel 'Time'
set ylabel 'Value'

set xrange [4:10]
set yrange [-3:6]

set style line 2 lc rgb 'green' pt 9 # triangle

#set datafile separator ";"

# replot is also possible for the second plot
plot 'trend3_mini_raw.csv' with linespoints title 'raw', \
'trend3_mini_compressed.csv' with points ls 2 title 'compressed'
5 changes: 5 additions & 0 deletions data/swinging-door/trend3_mini_compressed.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#x y
5 -2
6 5
8 3
9 5
6 changes: 6 additions & 0 deletions data/swinging-door/trend3_mini_raw.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#x y
5 -2
6 5
7 4
8 3
9 5
10 changes: 10 additions & 0 deletions data/swinging-door/trend3_raw.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#x y
1 0
2 1
3 2
4 5
5 -2
6 5
7 4
8 3
9 5
130 changes: 95 additions & 35 deletions source/gfoidl.DataCompression/DataPointIndexedIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,20 @@ internal sealed class DataPointIndexedIterator<TList> : DataPointIterator
private DataPointIterator? _wrapperIterator;
private TList? _list;
private int _snapShotIndex;
private int _previousSnapshotIndex;
private int _lastArchivedIndex;
private int _incomingIndex;
//-----------------------------------------------------------------
//---------------------------------------------------------------------
private int SnapShotIndex
{
get => _snapShotIndex;
set
{
_previousSnapshotIndex = _snapShotIndex;
_snapShotIndex = value;
}
}
//---------------------------------------------------------------------
public void SetData(Compression compression, DataPointIterator wrappedIterator, TList source)
{
Debug.Assert(wrappedIterator is not null);
Expand All @@ -37,7 +48,7 @@ public override DataPointIterator Clone()

return clone;
}
//-----------------------------------------------------------------
//---------------------------------------------------------------------
public override bool MoveNext()
{
Debug.Assert(_list is not null);
Expand All @@ -58,17 +69,16 @@ public override bool MoveNext()

this.Init(_incoming);
_lastArchivedIndex = 0;
_snapShotIndex = 0;
this.SnapShotIndex = 0;
_state = IterateState;
_incomingIndex = 1;
return true;
}
case IterateState:
{
TList source = _list;
int snapShotIndex = _snapShotIndex;
int incomingIndex = _incomingIndex;
int lastArchivedIndex = _lastArchivedIndex;
TList source = _list;
int incomingIndex = _incomingIndex;
int lastArchivedIndex = _lastArchivedIndex;

while (true)
{
Expand All @@ -87,32 +97,50 @@ public override bool MoveNext()
if (!archive.Archive)
{
this.UpdateFilters(_incoming, _lastArchived);
snapShotIndex = incomingIndex++;
this.SnapShotIndex = incomingIndex++;
continue;
}

if (!archive.MaxDelta && _lastArchivedIndex != snapShotIndex && (uint)snapShotIndex < (uint)source.Count)
if (!archive.MaxDelta && _lastArchivedIndex != this.SnapShotIndex && (uint)this.SnapShotIndex < (uint)source.Count)
{
_lastArchived = source[snapShotIndex];
_lastArchivedIndex = snapShotIndex;
_snapShotIndex = incomingIndex;
_lastArchived = source[this.SnapShotIndex];
_lastArchivedIndex = this.SnapShotIndex;
this.SnapShotIndex = incomingIndex;
_incomingIndex = incomingIndex;
_state = _archiveIncomingState;
_state = _stateAfterArchive;
return true;
}

_snapShotIndex = snapShotIndex;
_incomingIndex = incomingIndex;
goto case ArchivePointState;
}

_state = EndOfDataState;
incomingIndex--;
if (incomingIndex != _lastArchivedIndex) // sentinel check
{
_lastArchived = source[incomingIndex];
return true;
_incomingIndex = incomingIndex;

if (_previousSnapshotIndex != _lastArchivedIndex)
{
// Construct a door from the last archived point to the
// incoming (final point), and check whether the penultimate
// point is to archive or not.
this.Init(_incoming);
this.UpdateFilters(_incoming, _lastArchived);
_previousSnapShot = _list[_previousSnapshotIndex];
ref var archive = ref this.IsPointToArchive(_previousSnapShot, _lastArchived);

if (archive.Archive)
{
_lastArchived = _previousSnapShot;
_state = EndOfDataState;
return true;
}
}

goto case EndOfDataState;
}

goto default;
}
case ArchiveIncomingState:
Expand All @@ -128,21 +156,36 @@ public override bool MoveNext()
int incomingIndex = _incomingIndex;
this.Init(_incoming);
this.UpdateFilters(_incoming, _lastArchived);
_incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, _snapShotIndex);
_incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, this.SnapShotIndex);
goto case IterateState;
}
case ArchivePointState:
{
int incomingIndex = _incomingIndex;
_lastArchived = _list[incomingIndex];
_lastArchivedIndex = incomingIndex;
_snapShotIndex = incomingIndex;
this.SnapShotIndex = incomingIndex;
_state = IterateState;
this.Init(_incoming);
incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, _snapShotIndex);
incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, this.SnapShotIndex);
_incomingIndex = incomingIndex + 1;
return true;
}
case EndOfDataState:
{
if ((uint)_incomingIndex < (uint)_list.Count)
{
DataPoint incoming = _list[_incomingIndex];
if (incoming != _lastArchived) // sentinel check
{
_lastArchived = _list[_incomingIndex];
_state = FinalState;
return true;
}
}

goto default;
}
case InitialState:
{
ThrowHelper.ThrowInvalidOperation(ThrowHelper.ExceptionResource.GetEnumerator_must_be_called_first);
Expand Down Expand Up @@ -173,7 +216,7 @@ public override DataPoint[] ToArray()

return arrayBuilder.ToArray();
}
//-----------------------------------------------------------------
//---------------------------------------------------------------------
public override List<DataPoint> ToList()
{
Debug.Assert(_list is not null);
Expand All @@ -198,7 +241,7 @@ private void BuildCollection<TBuilder>(TList source, ref TBuilder builder)
where TBuilder : ICollectionBuilder<DataPoint>
{
int incomingIndex = 0;
int snapShotIndex = 0;
this.SnapShotIndex = 0;
int lastArchivedIndex = 0;

if ((uint)incomingIndex >= (uint)source.Count) return;
Expand Down Expand Up @@ -227,16 +270,16 @@ private void BuildCollection<TBuilder>(TList source, ref TBuilder builder)
if (!archive.Archive)
{
this.UpdateFilters(incoming, _lastArchived);
snapShotIndex = incomingIndex++;
this.SnapShotIndex = incomingIndex++;
continue;
}

if (!archive.MaxDelta && lastArchivedIndex != snapShotIndex && (uint)snapShotIndex < (uint)source.Count)
if (!archive.MaxDelta && lastArchivedIndex != this.SnapShotIndex && (uint)this.SnapShotIndex < (uint)source.Count)
{
DataPoint snapShot = source[snapShotIndex];
DataPoint snapShot = source[this.SnapShotIndex];
_lastArchived = snapShot;
lastArchivedIndex = snapShotIndex;
snapShotIndex = incomingIndex;
lastArchivedIndex = this.SnapShotIndex;
this.SnapShotIndex = incomingIndex;
builder.Add(snapShot);

if (_archiveIncoming)
Expand All @@ -249,22 +292,38 @@ private void BuildCollection<TBuilder>(TList source, ref TBuilder builder)
this.Init(incoming);
this.UpdateFilters(incoming, _lastArchived);
incomingIndex++;
incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, snapShotIndex);
incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, this.SnapShotIndex);
continue;
}

_lastArchived = incoming;
lastArchivedIndex = incomingIndex;
snapShotIndex = incomingIndex;
_lastArchived = incoming;
lastArchivedIndex = incomingIndex;
this.SnapShotIndex = incomingIndex;
builder.Add(incoming);
this.Init(incoming);
incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, snapShotIndex);
incomingIndex = this.HandleSkipMinDeltaX(incomingIndex, this.SnapShotIndex);
incomingIndex++;
}

incomingIndex--;
if (incomingIndex != lastArchivedIndex && (uint)incomingIndex < (uint)source.Count)
{
if (_previousSnapshotIndex != lastArchivedIndex && (uint)_previousSnapshotIndex < (uint)source.Count)
{
// Construct a door from the last archived point to the
// incoming (final point), and check whether the penultimate
// point is to archive or not.
this.Init(incoming);
this.UpdateFilters(incoming, _lastArchived);
_previousSnapShot = source[_previousSnapshotIndex];
ref var archive = ref this.IsPointToArchive(_previousSnapShot, _lastArchived);

if (archive.Archive)
{
builder.Add(_previousSnapShot);
}
}

builder.Add(source[incomingIndex]);
}
}
Expand Down Expand Up @@ -322,9 +381,10 @@ protected override void DisposeCore()
_wrapperIterator = null;
}

_snapShotIndex = -1;
_lastArchivedIndex = -1;
_incomingIndex = -1;
_snapShotIndex = -1;
_previousSnapshotIndex = -1;
_lastArchivedIndex = -1;
_incomingIndex = -1;

base.DisposeCore();
}
Expand Down
33 changes: 25 additions & 8 deletions source/gfoidl.DataCompression/DataPointIterator.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ private async IAsyncEnumerator<DataPoint> IterateCore(CancellationToken cancella
cancellationToken.ThrowIfCancellationRequested();

_lastArchived = incoming;
_snapShot = incoming;
this.SnapShot = incoming;
this.Init(incoming);
continue;
}

if (isSkipMinDeltaX)
{
if ((incoming.X - _snapShot.X) < _minDeltaX)
if ((incoming.X - this.SnapShot.X) < _minDeltaX)
continue;

isSkipMinDeltaX = false;
Expand All @@ -96,17 +96,17 @@ private async IAsyncEnumerator<DataPoint> IterateCore(CancellationToken cancella
if (!_archive.Archive)
{
this.UpdateFilters(incoming, _lastArchived);
_snapShot = incoming;
this.SnapShot = incoming;
continue;
}

if (!_archive.MaxDelta && _lastArchived != _snapShot)
if (!_archive.MaxDelta && _lastArchived != this.SnapShot)
{
yield return _snapShot;
yield return this.SnapShot;
cancellationToken.ThrowIfCancellationRequested();

_lastArchived = _snapShot;
_snapShot = incoming;
_lastArchived = this.SnapShot;
this.SnapShot = incoming;

if (_archiveIncoming)
{
Expand All @@ -127,14 +127,31 @@ private async IAsyncEnumerator<DataPoint> IterateCore(CancellationToken cancella
cancellationToken.ThrowIfCancellationRequested();

_lastArchived = incoming;
_snapShot = incoming;
this.SnapShot = incoming;
isSkipMinDeltaX = _minDeltaX.HasValue;
this.Init(incoming);
}

cancellationToken.ThrowIfCancellationRequested();
if (_incoming != _lastArchived) // sentinel-check
{
if (_previousSnapShot != _lastArchived)
{
// Construct a door from the last archived point to the
// incoming (final point), and check whether the penultimate
// point is to archive or not.
this.Init(_incoming);
this.UpdateFilters(_incoming, _lastArchived);
this.IsPointToArchive(_previousSnapShot, _lastArchived);

if (_archive.Archive)
{
yield return _previousSnapShot;
}
}

yield return _incoming;
}
}
//---------------------------------------------------------------------
private protected virtual async ValueTask BuildCollectionAsync<TBuilder>(TBuilder builder, CancellationToken cancellationToken)
Expand Down
Loading

0 comments on commit e47ae3d

Please sign in to comment.