Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup a few websocket things #3625

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Realm/Realm/Native/PrimitiveValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

using System;
using System.Buffers;
using System.Collections;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;
Expand Down
44 changes: 12 additions & 32 deletions Realm/Realm/Native/SyncSocketProvider.EventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -48,56 +47,37 @@ internal void Cancel()
_cts.Dispose();
}

private class Work : IWork
private class Work(IntPtr nativeCallback, CancellationToken cancellationToken)
: IWork
{
private readonly IntPtr _nativeCallback;
private readonly CancellationToken _cancellationToken;

public Work(IntPtr nativeCallback, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_cancellationToken = cancellationToken;
}

public void Execute()
{
var status = Status.OK;
if (_cancellationToken.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
status = new(ErrorCode.OperationAborted, "Timer canceled");
}

RunCallback(_nativeCallback, status);
RunCallback(nativeCallback, status);
}
}
}

private class EventLoopWork : IWork
// Belongs to SyncSocketProvider. When Native destroys the Provider we need to stop executing
// enqueued work, but we need to release all the callbacks we copied on the heap.
private class EventLoopWork(IntPtr nativeCallback, CancellationToken cancellationToken)
: IWork
{
private readonly IntPtr _nativeCallback;
private readonly Status _status;

// Belongs to SyncSocketProvider. When Native destroys the Provider we need to stop executing
// enqueued work, but we need to release all the callbacks we copied on the heap.
private readonly CancellationToken _cancellationToken;

public EventLoopWork(IntPtr nativeCallback, Status status, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_status = status;
_cancellationToken = cancellationToken;
}

public void Execute()
{
if (_cancellationToken.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
Logger.LogDefault(LogLevel.Trace, "Deleting EventLoopWork callback only because event loop was cancelled.");
NativeMethods.delete_callback(_nativeCallback);
NativeMethods.delete_callback(nativeCallback);
return;
}

RunCallback(_nativeCallback, _status);
RunCallback(nativeCallback, Status.OK);
}
}

Expand All @@ -112,7 +92,7 @@ private static void RunCallback(IntPtr nativeCallback, Status status)
private async Task PostWorkAsync(IntPtr nativeCallback)
{
Logger.LogDefault(LogLevel.Trace, "Posting work to SyncSocketProvider event loop.");
await _workQueue.Writer.WriteAsync(new EventLoopWork(nativeCallback, Status.OK, _cts.Token));
await _workQueue.Writer.WriteAsync(new EventLoopWork(nativeCallback, _cts.Token));
}

private async partial Task WorkThread()
Expand Down
76 changes: 21 additions & 55 deletions Realm/Realm/Native/SyncSocketProvider.WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
using System.Buffers;
using System.IO;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Channels;
Expand Down Expand Up @@ -68,7 +67,7 @@ private async Task ReadThread()
{
var builder = new StringBuilder();
FormatExceptionForLogging(e, builder);
Logger.LogDefault(LogLevel.Error, "Error establishing WebSocket connection " + builder.ToString());
Logger.LogDefault(LogLevel.Error, $"Error establishing WebSocket connection {builder}");

await _workQueue.WriteAsync(new WebSocketClosedWork(false, (WebSocketCloseStatus)RLM_ERR_WEBSOCKET_CONNECTION_FAILED, e.Message, _observer, _cancellationToken));
return;
Expand All @@ -86,9 +85,9 @@ private async Task ReadThread()
await _receiveBuffer.WriteAsync(buffer, 0, result.Count);
if (result.EndOfMessage)
{
var current_buffer = _receiveBuffer;
var currentBuffer = _receiveBuffer;
_receiveBuffer = new MemoryStream();
await _workQueue.WriteAsync(new BinaryMessageReceivedWork(current_buffer, _observer, _cancellationToken));
await _workQueue.WriteAsync(new BinaryMessageReceivedWork(currentBuffer, _observer, _cancellationToken));
}

break;
Expand All @@ -105,7 +104,7 @@ private async Task ReadThread()
{
var builder = new StringBuilder();
FormatExceptionForLogging(e, builder);
Logger.LogDefault(LogLevel.Error, "Error reading from WebSocket " + builder.ToString());
Logger.LogDefault(LogLevel.Error, $"Error reading from WebSocket {builder}");

await _workQueue.WriteAsync(new WebSocketClosedWork(false, (WebSocketCloseStatus)RLM_ERR_WEBSOCKET_READ_ERROR, e.Message, _observer, _cancellationToken));
return;
Expand All @@ -123,7 +122,6 @@ public async void Write(BinaryValue data, IntPtr native_callback)

var buffer = data.AsBytes(usePooledArray: true);

var status = Status.OK;
try
{
await _webSocket.SendAsync(new(buffer), WebSocketMessageType.Binary, true, _cancellationToken);
Expand All @@ -132,7 +130,7 @@ public async void Write(BinaryValue data, IntPtr native_callback)
{
var builder = new StringBuilder();
FormatExceptionForLogging(e, builder);
Logger.LogDefault(LogLevel.Error, "Error writing to WebSocket " + builder.ToString());
Logger.LogDefault(LogLevel.Error, $"Error writing to WebSocket {builder}");

// in case of errors notify the websocket observer and just dispose the callback
await _workQueue.WriteAsync(new WebSocketClosedWork(false, (WebSocketCloseStatus)RLM_ERR_WEBSOCKET_WRITE_ERROR, e.Message, _observer, _cancellationToken));
Expand All @@ -144,7 +142,7 @@ public async void Write(BinaryValue data, IntPtr native_callback)
ArrayPool<byte>.Shared.Return(buffer);
}

await _workQueue.WriteAsync(new EventLoopWork(native_callback, status, _cancellationToken));
await _workQueue.WriteAsync(new EventLoopWork(native_callback, _cancellationToken));
}

public async void Dispose()
Expand Down Expand Up @@ -201,98 +199,66 @@ private static void FormatExceptionForLogging(Exception ex, StringBuilder builde
FormatExceptionForLogging(inner, builder, nesting + 1);
}
}
else if (ex.InnerException is Exception inner)
else if (ex.InnerException is { } inner)
{
FormatExceptionForLogging(inner, builder, nesting + 1);
}
}
}

private abstract class WebSocketWork : IWork
private abstract class WebSocketWork(IntPtr observer, CancellationToken cancellationToken)
: IWork
{
private readonly IntPtr _observer;

// Belongs to the Socket and canceled when Native destroys the socket.
// If it's canceled we shouldn't call any observer methods.
private readonly CancellationToken _cancellationToken;

protected WebSocketWork(IntPtr observer, CancellationToken cancellationToken)
{
_observer = observer;
_cancellationToken = cancellationToken;
}
private readonly CancellationToken _cancellationToken = cancellationToken;

protected abstract void Execute(IntPtr observer);

void IWork.Execute()
{
if (!_cancellationToken.IsCancellationRequested)
{
Execute(_observer);
Execute(observer);
}
}
}

private sealed class WebSocketConnectedWork : WebSocketWork
private sealed class WebSocketConnectedWork(string protocol, IntPtr observer, CancellationToken cancellationToken)
: WebSocketWork(observer, cancellationToken)
{
private readonly string _protocol;

public WebSocketConnectedWork(string protocol, IntPtr observer, CancellationToken cancellationToken)
: base(observer, cancellationToken)
{
_protocol = protocol;
}

protected override void Execute(IntPtr observer)
{
using var arena = new Arena();
NativeMethods.observer_connected_handler(observer, StringValue.AllocateFrom(_protocol, arena));
NativeMethods.observer_connected_handler(observer, StringValue.AllocateFrom(protocol, arena));
}
}

private sealed class BinaryMessageReceivedWork : WebSocketWork
private sealed class BinaryMessageReceivedWork(MemoryStream receiveBuffer, IntPtr observer, CancellationToken cancellationToken)
: WebSocketWork(observer, cancellationToken)
{
private readonly MemoryStream _receiveBuffer;

public BinaryMessageReceivedWork(MemoryStream receiveBuffer, IntPtr observer, CancellationToken cancellationToken)
: base(observer, cancellationToken)
{
_receiveBuffer = receiveBuffer;
}

protected unsafe override void Execute(IntPtr observer)
{
using var buffer = _receiveBuffer;
using var buffer = receiveBuffer;
fixed (byte* data = buffer.GetBuffer())
{
NativeMethods.observer_binary_message_received(observer, new() { data = data, size = (IntPtr)buffer.Length });
}
}
}

private sealed class WebSocketClosedWork : WebSocketWork
private sealed class WebSocketClosedWork(bool clean, WebSocketCloseStatus status, string description, IntPtr observer, CancellationToken cancellationToken)
: WebSocketWork(observer, cancellationToken)
{
private readonly bool _clean;
private readonly WebSocketCloseStatus _status;
private readonly string _description;

public WebSocketClosedWork(bool clean, WebSocketCloseStatus status, string description, IntPtr observer, CancellationToken cancellationToken)
: base(observer, cancellationToken)
{
_clean = clean;
_status = status;
_description = description;
}

protected override void Execute(IntPtr observer)
{
if (!_clean)
if (!clean)
{
NativeMethods.observer_error_handler(observer);
}

using var arena = new Arena();
NativeMethods.observer_closed_handler(observer, _clean, _status, StringValue.AllocateFrom(_description, arena));
NativeMethods.observer_closed_handler(observer, clean, status, StringValue.AllocateFrom(description, arena));
}
}
}
30 changes: 13 additions & 17 deletions Realm/Realm/Native/SyncSocketProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ private static IntPtr WebSocketConnect(IntPtr managed_provider, IntPtr observer,

provider._onWebSocketConnection?.Invoke(webSocket.Options);

var builder = new UriBuilder();
builder.Scheme = endpoint.is_ssl ? "wss" : "ws";
builder.Host = endpoint.address;
builder.Port = endpoint.port;
var builder = new UriBuilder
{
Scheme = endpoint.is_ssl ? "wss" : "ws",
Host = endpoint.address!,
Port = endpoint.port
};

if (endpoint.path)
{
var pathAndQuery = ((string)endpoint.path)!.Split('?');
builder.Path = pathAndQuery.ElementAtOrDefault(0);
builder.Query = pathAndQuery.ElementAtOrDefault(1);
builder.Path = pathAndQuery.ElementAtOrDefault(0) ?? string.Empty;
builder.Query = pathAndQuery.ElementAtOrDefault(1) ?? String.Empty;
}

var socket = new Socket(webSocket, observer, provider._workQueue, builder.Uri);
Expand Down Expand Up @@ -128,18 +131,11 @@ static SyncSocketProvider()
NativeMethods.install_callbacks(post, dispose, create_timer, cancel_timer, websocket_connect, websocket_write, websocket_close);
}

private struct Status
private struct Status(ErrorCode code, string? reason)
{
internal ErrorCode Code;
internal string? Reason;
internal static readonly Status OperationAborted = new(ErrorCode.OperationAborted, "Operation canceled");
internal static readonly Status OK = new() { Code = ErrorCode.Ok };

public Status(ErrorCode code, string reason)
{
Code = code;
Reason = reason;
}
internal readonly string? Reason = reason;
internal readonly ErrorCode Code = code;
internal static readonly Status OK = new(ErrorCode.Ok, null);
}

/// <summary>
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading