From bba37d450a9af5e581ce84d7deb9a8d9eb1a4f1e Mon Sep 17 00:00:00 2001 From: Ruben Buniatyan Date: Wed, 6 Nov 2024 22:10:31 +0100 Subject: [PATCH] Fix IPC data handling (#7714) --- .../JsonRpcSocketsClientTests.cs | 53 ++++++----- .../Properties/launchSettings.json | 8 ++ .../IpcSocketMessageStream.cs | 93 +++++++++---------- 3 files changed, 81 insertions(+), 73 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs index e011481dbb2..e9640899c9b 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Net; @@ -63,7 +64,6 @@ public async Task Can_handle_very_large_objects() Assert.That(sent, Is.EqualTo(received)); } - [Test] [TestCase(1)] [TestCase(2)] [TestCase(10)] @@ -80,8 +80,13 @@ static async Task CountNumberOfMessages(Socket socket, CancellationToken to byte[] buffer = new byte[10]; while (true) { - ReceiveResult? result = await stream.ReceiveAsync(buffer).ConfigureAwait(false); - if (result?.EndOfMessage == true) + ReceiveResult? result = await stream.ReceiveAsync(buffer); + + // Imitate random delays + if (Stopwatch.GetTimestamp() % 101 == 0) + await Task.Delay(1); + + if (result is not null && IsEndOfIpcMessage(result)) { messages++; } @@ -108,7 +113,7 @@ static async Task CountNumberOfMessages(Socket socket, CancellationToken to Task sendMessages = Task.Run(async () => { using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - await socket.ConnectAsync(ipEndPoint).ConfigureAwait(false); + await socket.ConnectAsync(ipEndPoint); using IpcSocketMessageStream stream = new(socket); using JsonRpcSocketsClient client = new( @@ -123,18 +128,18 @@ static async Task CountNumberOfMessages(Socket socket, CancellationToken to for (int i = 0; i < messageCount; i++) { - using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(1_000, () => disposeCount++), default); - await client.SendJsonRpcResult(result).ConfigureAwait(false); - await Task.Delay(1).ConfigureAwait(false); + using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(100, () => disposeCount++), default); + await client.SendJsonRpcResult(result); + await Task.Delay(1); } disposeCount.Should().Be(messageCount); - await cts.CancelAsync().ConfigureAwait(false); + await cts.CancelAsync(); return messageCount; }); - await Task.WhenAll(sendMessages, receiveMessages).ConfigureAwait(false); + await Task.WhenAll(sendMessages, receiveMessages); int sent = sendMessages.Result; int received = receiveMessages.Result; @@ -158,17 +163,17 @@ async Task ReadMessages(Socket socket, IList receivedMessages, Canc byte[] buffer = new byte[bufferSize]; while (true) { - ReceiveResult? result = await stream.ReceiveAsync(buffer).ConfigureAwait(false); + ReceiveResult? result = await stream.ReceiveAsync(buffer); if (result is not null) { msg.AddRange(buffer.Take(result.Read)); - } - if (result?.EndOfMessage == true) - { - messages++; - receivedMessages.Add(msg.ToArray()); - msg = []; + if (IsEndOfIpcMessage(result)) + { + messages++; + receivedMessages.Add(msg.ToArray()); + msg = []; + } } if (result is null || result.Closed) @@ -190,14 +195,14 @@ async Task ReadMessages(Socket socket, IList receivedMessages, Canc Task receiveMessages = OneShotServer( ipEndPoint, - async socket => await ReadMessages(socket, receivedMessages, cts.Token).ConfigureAwait(false) + async socket => await ReadMessages(socket, receivedMessages, cts.Token) ); Task sendMessages = Task.Run(async () => { int messageCount = 0; using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - await socket.ConnectAsync(ipEndPoint).ConfigureAwait(false); + await socket.ConnectAsync(ipEndPoint); using IpcSocketMessageStream stream = new(socket); using JsonRpcSocketsClient client = new( @@ -216,20 +221,20 @@ async Task ReadMessages(Socket socket, IList receivedMessages, Canc messageCount++; var msg = Enumerable.Range(11, i).Select(x => (byte)x).ToArray(); sentMessages.Add(msg); - await stream.WriteAsync(msg).ConfigureAwait(false); - await stream.WriteEndOfMessageAsync().ConfigureAwait(false); + await stream.WriteAsync(msg.Append((byte)'\n').ToArray()); + if (i % 10 == 0) { - await Task.Delay(1).ConfigureAwait(false); + await Task.Delay(1); } } stream.Close(); - await cts.CancelAsync().ConfigureAwait(false); + await cts.CancelAsync(); return messageCount; }); - await Task.WhenAll(sendMessages, receiveMessages).ConfigureAwait(false); + await Task.WhenAll(sendMessages, receiveMessages); int sent = sendMessages.Result; int received = receiveMessages.Result; @@ -551,4 +556,6 @@ private static string RandomString(int length) } return new string(stringChars); } + + private static bool IsEndOfIpcMessage(ReceiveResult result) => result.EndOfMessage && (!result.Closed || result.Read != 0); } diff --git a/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json b/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json index 34ae30538e9..b576593a76c 100644 --- a/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json +++ b/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json @@ -122,6 +122,14 @@ "Docker": { "commandName": "Docker", "commandLineArgs": "-c holesky --data-dir .data /data --jsonrpc-enginehost 0.0.0.0 --jsonrpc-engineport 8551 --jsonrpc-host 0.0.0.0" + }, + "WSL": { + "commandName": "WSL", + "commandLineArgs": "\"{OutDir}/nethermind.dll\" -c holesky --data-dir .data", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "distributionName": "" } } } diff --git a/src/Nethermind/Nethermind.Sockets/IpcSocketMessageStream.cs b/src/Nethermind/Nethermind.Sockets/IpcSocketMessageStream.cs index 2ecedcda179..0dd6c90748a 100644 --- a/src/Nethermind/Nethermind.Sockets/IpcSocketMessageStream.cs +++ b/src/Nethermind/Nethermind.Sockets/IpcSocketMessageStream.cs @@ -11,76 +11,69 @@ namespace Nethermind.Sockets; public class IpcSocketMessageStream(Socket socket) : NetworkStream(socket), IMessageBorderPreservingStream { - private static readonly byte Delimiter = Convert.ToByte('\n'); + private const byte Delimiter = (byte)'\n'; - public byte[] bufferedData = []; - public int bufferedDataLength = 0; + private byte[] _bufferedData = []; + private int _bufferedDataLength = 0; public async Task ReceiveAsync(ArraySegment buffer) { - ReceiveResult? result = null; - if (Socket.Connected) + if (!Socket.Connected) + return null; + + if (_bufferedDataLength > 0) { - if (bufferedDataLength > 0) + if (_bufferedDataLength > buffer.Count) + throw new NotSupportedException($"Passed {nameof(buffer)} should be larger than internal one"); + + try { - if (bufferedDataLength > buffer.Count) - { - throw new NotSupportedException($"Passed {nameof(buffer)} should be larger than internal one"); - } - try - { - Buffer.BlockCopy(bufferedData, 0, buffer.Array!, buffer.Offset, bufferedDataLength); - } - catch (Exception) - { - - } + Buffer.BlockCopy(_bufferedData, 0, buffer.Array!, buffer.Offset, _bufferedDataLength); } + catch { } + } - int read = bufferedDataLength + await Socket.ReceiveAsync(buffer[bufferedDataLength..], SocketFlags.None); + int read = _bufferedDataLength + await Socket.ReceiveAsync(buffer[_bufferedDataLength..], SocketFlags.None); - int delimiter = ((IList)buffer[..read]).IndexOf(Delimiter); + int delimiter = ((IList)buffer[..read]).IndexOf(Delimiter); + bool endOfMessage; - bool endOfMessage; - if (delimiter != -1 && (delimiter + 1) < read) - { - bufferedDataLength = read - delimiter - 1; - - if (bufferedData.Length < buffer.Count) - { - if (bufferedData.Length != 0) - { - ArrayPool.Shared.Return(bufferedData); - } - bufferedData = ArrayPool.Shared.Rent(buffer.Count); - } - endOfMessage = true; - buffer[(delimiter + 1)..read].CopyTo(bufferedData); - read = delimiter + 1; - } - else + if (delimiter != -1 && (delimiter + 1) < read) + { + _bufferedDataLength = read - delimiter - 1; + + if (_bufferedData.Length < buffer.Count) { - endOfMessage = delimiter != -1; - bufferedDataLength = 0; + if (_bufferedData.Length != 0) + ArrayPool.Shared.Return(_bufferedData); + + _bufferedData = ArrayPool.Shared.Rent(buffer.Count); } - result = new ReceiveResult() - { - Closed = read == 0, - Read = read > 0 && buffer[read - 1] == Delimiter ? read - 1 : read, - EndOfMessage = endOfMessage, - CloseStatusDescription = null - }; + endOfMessage = true; + buffer[(delimiter + 1)..read].CopyTo(_bufferedData); + read = delimiter + 1; + } + else + { + endOfMessage = delimiter != -1 || Socket.Available == 0; + _bufferedDataLength = 0; } - return result; + return new() + { + Closed = read == 0, + Read = read > 0 && buffer[read - 1] == Delimiter ? read - 1 : read, + EndOfMessage = endOfMessage, + CloseStatusDescription = null + }; } protected override void Dispose(bool disposing) { - if (disposing && bufferedData.Length != 0) + if (disposing && _bufferedData.Length != 0) { - ArrayPool.Shared.Return(bufferedData); + ArrayPool.Shared.Return(_bufferedData); } base.Dispose(disposing); }