Skip to content

Commit

Permalink
Added support for copying readable streams to writable streams and ad…
Browse files Browse the repository at this point in the history
…ded test coverage.
  • Loading branch information
MeltyPlayer committed Oct 17, 2023
1 parent ea7d495 commit 5adb318
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 13 deletions.
2 changes: 0 additions & 2 deletions Schema Tests/TextSchemaTestUtil.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System.IO;

using schema.text.reader;

using TextReader = schema.text.reader.TextReader;

namespace schema.text {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using NUnit.Framework;

using schema.util.streams;

namespace schema.lib.SubstreamSharp {
public class RangedSubstreamTests {
namespace schema.util.streams {
public class RangedReadableSubstreamTests {
[Test]
public void TestFullSubstream() {
var s = new ReadableStream(new byte[] { 1, 2, 3, 4, 5, 6, 7 });
Expand Down
64 changes: 64 additions & 0 deletions Schema Tests/util/streams/ReadableStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using System.IO;

using NUnit.Framework;

namespace schema.util.streams {
public class ReadableStreamTests {
[Test]
public void TestPosition() {
var ms = new MemoryStream(new byte[] { 1, 2, 3 });
var rs = new ReadableStream(ms);

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, rs.Position);

rs.Position = 2;
Assert.AreEqual(2, ms.Position);
Assert.AreEqual(2, rs.Position);
}

[Test]
public void TestLength() {
var ms = new MemoryStream(new byte[] { 1, 2, 3 });
var rs = new ReadableStream(ms);

Assert.AreEqual(3, ms.Length);
Assert.AreEqual(3, rs.Length);
}

[Test]
public void TestReadByte() {
var ms = new MemoryStream(new byte[] { 1, 2, 3 });
var rs = new ReadableStream(ms);

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, rs.Position);

Assert.AreEqual(1, rs.ReadByte());
Assert.AreEqual(1, ms.Position);
Assert.AreEqual(1, rs.Position);

Assert.AreEqual(2, rs.ReadByte());
Assert.AreEqual(2, ms.Position);
Assert.AreEqual(2, rs.Position);
}

[Test]
public void TestReadSpan() {
var ms = new MemoryStream(new byte[] { 1, 2, 3 });
var rs = new ReadableStream(ms);

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, rs.Position);

Span<byte> span = stackalloc byte[5];

Assert.AreEqual(3, rs.Read(span));
Assert.AreEqual(3, ms.Position);
Assert.AreEqual(3, rs.Position);

CollectionAssert.AreEqual(new[] { 1, 2, 3, 0, 0 }, span.ToArray());
}
}
}
107 changes: 107 additions & 0 deletions Schema Tests/util/streams/WritableStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.IO;
using System.Linq;

using NUnit.Framework;

namespace schema.util.streams {
public class WritableStreamTests {
[Test]
public void TestPosition() {
var ms = new MemoryStream(new byte[] { 1, 2, 3 });
var ws = new WritableStream(ms);

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, ws.Position);

ws.Position = 2;
Assert.AreEqual(2, ms.Position);
Assert.AreEqual(2, ws.Position);
}

[Test]
public void TestLength() {
var ms = new MemoryStream(new byte[] { 1, 2, 3 });
var ws = new WritableStream(ms);

Assert.AreEqual(3, ms.Length);
Assert.AreEqual(3, ws.Length);
}

[Test]
public void TestWriteByte() {
var data = new byte[] { 1, 2, 3 };
var ms = new MemoryStream(data);
var ws = new WritableStream(ms);

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, ws.Position);

ws.WriteByte(5);
Assert.AreEqual(5, data[0]);
Assert.AreEqual(1, ms.Position);
Assert.AreEqual(1, ws.Position);

ws.WriteByte(6);
Assert.AreEqual(6, data[1]);
Assert.AreEqual(2, ms.Position);
Assert.AreEqual(2, ws.Position);
}

[Test]
public void TestWriteSpan() {
var ms = new MemoryStream();
var ws = new WritableStream(ms);

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, ws.Position);

ReadOnlySpan<byte> span = stackalloc byte[5] { 5, 6, 7, 8, 9 };

ws.Write(span);
Assert.AreEqual(5, ms.Position);
Assert.AreEqual(5, ws.Position);

CollectionAssert.AreEqual(new[] { 5, 6, 7, 8, 9 }, ms.ToArray());
}

[Test]
public void TestWriteReadableStream() {
var ms = new MemoryStream();
var ws = new WritableStream(ms);
var rs = new ReadableStream(new byte[] { 5, 6, 7, 8, 9 });

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, ws.Position);
Assert.AreEqual(0, rs.Position);

ws.Write(rs);
Assert.AreEqual(rs.Length, ms.Position);
Assert.AreEqual(rs.Length, ws.Position);
Assert.AreEqual(rs.Length, rs.Position);

CollectionAssert.AreEqual(new[] { 5, 6, 7, 8, 9 }, ms.ToArray());
}

[Test]
public void TestWriteLongReadableStream() {
var readData =
Enumerable.Range(0, 300_000).Select(i => (byte) i).ToArray();

var ms = new MemoryStream();
var ws = new WritableStream(ms);
var rs = new ReadableStream(readData);

Assert.AreEqual(0, ms.Position);
Assert.AreEqual(0, ws.Position);
Assert.AreEqual(0, rs.Position);

ws.Write(rs);
Assert.AreEqual(readData.Length, ms.Position);
Assert.AreEqual(readData.Length, ws.Position);
Assert.AreEqual(readData.Length, rs.Position);

CollectionAssert.AreEqual(readData, ms.ToArray());
}
}
}
1 change: 1 addition & 0 deletions Schema/src/util/streams/Interfaces.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface IReadableStream {

public interface IWritableStream {
void Write(ReadOnlySpan<byte> src);
void Write(IReadableStream readableStream);
}

public interface ISizedReadableStream : IReadableStream,
Expand Down
16 changes: 9 additions & 7 deletions Schema/src/util/streams/ReadableStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,29 @@

namespace schema.util.streams {
public class ReadableStream : ISeekableReadableStream {
private readonly Stream impl_;
internal Stream Impl { get; }

public static implicit operator ReadableStream(Stream impl) => new(impl);

public ReadableStream(Stream impl) {
if (!impl.CanRead) {
throw new ArgumentException(nameof(impl));
}

this.impl_ = impl;
this.Impl = impl;
}

public ReadableStream(byte[] impl) : this(new MemoryStream(impl)) { }

public void Dispose() => this.impl_.Dispose();
public void Dispose() => this.Impl.Dispose();

public long Position {
get => this.impl_.Position;
set => this.impl_.Position = value;
get => this.Impl.Position;
set => this.Impl.Position = value;
}

public long Length => this.impl_.Length;
public long Length => this.Impl.Length;

public int Read(Span<byte> dst) => this.impl_.Read(dst);
public int Read(Span<byte> dst) => this.Impl.Read(dst);
}
}
23 changes: 23 additions & 0 deletions Schema/src/util/streams/WritableStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@

namespace schema.util.streams {
public class WritableStream(Stream impl) : ISeekableWritableStream {
/// <summary>
/// (Straight-up copied from the implementation of Stream.CopyTo())
/// We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
/// The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
/// improvement in Copy performance.
/// </summary>
private const int DEFAULT_COPY_BUFFER_SIZE = 81920;

public static implicit operator WritableStream(Stream impl) => new(impl);

public void Dispose() => impl.Dispose();

public long Position {
Expand All @@ -15,5 +25,18 @@ public long Position {
public long Length => impl.Length;

public void Write(ReadOnlySpan<byte> src) => impl.Write(src);

public void Write(IReadableStream readableStream) {
if (readableStream is ReadableStream readableStreamImpl) {
readableStreamImpl.Impl.CopyTo(impl);
return;
}

Span<byte> buffer = stackalloc byte[DEFAULT_COPY_BUFFER_SIZE];
int bytesRead;
while ((bytesRead = readableStream.Read(buffer)) != 0) {
impl.Write(buffer.Slice(0, bytesRead));
}
}
}
}

0 comments on commit 5adb318

Please sign in to comment.