From 3b40ad84644e8a0580a8ae5774ff97a43a343146 Mon Sep 17 00:00:00 2001 From: xinchen Date: Wed, 13 Jan 2021 12:32:35 -0800 Subject: [PATCH] Update pre-allocated buffers. --- .../Amqp/Transport/TcpTransport.cs | 4 +- Microsoft.Azure.Amqp/InternalBufferManager.cs | 51 ++++-------- test/TestCases/AmqpTransportTests.cs | 80 +++++++++++++------ 3 files changed, 70 insertions(+), 65 deletions(-) diff --git a/Microsoft.Azure.Amqp/Amqp/Transport/TcpTransport.cs b/Microsoft.Azure.Amqp/Amqp/Transport/TcpTransport.cs index 4b33e16c..00dbd7af 100644 --- a/Microsoft.Azure.Amqp/Amqp/Transport/TcpTransport.cs +++ b/Microsoft.Azure.Amqp/Amqp/Transport/TcpTransport.cs @@ -489,8 +489,8 @@ sealed class BufferSizeTracker // unitSize is the min value to increase to help small messages to reach level 1 // level is changed only when the trend is consistent during two consecutive windows // bufferSizes: must match the preallocated buffers in InternalBufferManager.PreallocatedBufferManager - static long durationTicks = TimeSpan.FromSeconds(4).Ticks; - static int[] thresholds = new int[] { 0, 8 * 1024, 4 * 1024 * 1024 }; + static long durationTicks = TimeSpan.FromSeconds(2).Ticks; + static int[] thresholds = new int[] { 0, 4 * 1024, 2 * 1024 * 1024 }; static int[] bufferSizes = new int[] { 0, 8 * 1024, 64 * 1024 }; int unitSize; DateTime firstOperation; diff --git a/Microsoft.Azure.Amqp/InternalBufferManager.cs b/Microsoft.Azure.Amqp/InternalBufferManager.cs index 285ca297..446ec371 100644 --- a/Microsoft.Azure.Amqp/InternalBufferManager.cs +++ b/Microsoft.Azure.Amqp/InternalBufferManager.cs @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Amqp using System; using System.Collections.Generic; using System.Threading; - using System.Runtime.InteropServices; using System.Collections.Concurrent; abstract class InternalBufferManager @@ -49,47 +48,36 @@ public static byte[] AllocateByteArray(int size) sealed class PreallocatedBufferManager : InternalBufferManager { - readonly int maxBufferSize; - readonly int medBufferSize; + readonly int largeBufferSize; readonly int smallBufferSize; readonly ConcurrentStack freeSmallBuffers; - readonly ConcurrentStack freeMedianBuffers; readonly ConcurrentStack freeLargeBuffers; byte[][] buffersList; internal PreallocatedBufferManager(long maxMemoryToPool) { - // Buffer sizes are fixed. - this.maxBufferSize = 64 * 1024; - this.medBufferSize = 8 * 1024; - this.smallBufferSize = 1024; + // Buffer sizes are fixed. For default maxPool 48MB, + // 64KB (L) = 48, 8KB (S) = 5760 + this.largeBufferSize = 64 * 1024; + this.smallBufferSize = 8 * 1024; - long numLargeBuffers = 128; - long numSmallBuffers = 4096; - long medMemorySize = maxMemoryToPool - (numLargeBuffers * this.maxBufferSize) - (numSmallBuffers * this.smallBufferSize); - long numMedBuffers = medMemorySize / this.medBufferSize; - long numBuffers = numLargeBuffers + numMedBuffers + numSmallBuffers; + long largeBufferMemory = maxMemoryToPool / 16; + long numLargeBuffers = largeBufferMemory / this.largeBufferSize; + long numSmallBuffers = (maxMemoryToPool - largeBufferMemory) / this.smallBufferSize; + long numBuffers = numLargeBuffers + numSmallBuffers; this.buffersList = new byte[numBuffers][]; this.freeSmallBuffers = new ConcurrentStack(); - this.freeMedianBuffers = new ConcurrentStack(); this.freeLargeBuffers = new ConcurrentStack(); int lastLarge = 0; for (int i = 0; i < numLargeBuffers; i++, lastLarge++) { - buffersList[i] = new byte[maxBufferSize]; + buffersList[i] = new byte[largeBufferSize]; this.freeLargeBuffers.Push(buffersList[i]); } - int lastMed = lastLarge; - for (int i = lastLarge; i < numMedBuffers + lastLarge; i++, lastMed++) - { - buffersList[i] = new byte[this.medBufferSize]; - this.freeMedianBuffers.Push(buffersList[i]); - } - - for (int i = lastMed; i < numSmallBuffers + lastMed; i++) + for (int i = lastLarge; i < numSmallBuffers + lastLarge; i++) { buffersList[i] = new byte[this.smallBufferSize]; this.freeSmallBuffers.Push(buffersList[i]); @@ -98,7 +86,7 @@ internal PreallocatedBufferManager(long maxMemoryToPool) public override byte[] TakeBuffer(int bufferSize) { - if (bufferSize > this.maxBufferSize) + if (bufferSize > this.largeBufferSize) { return null; } @@ -110,12 +98,6 @@ public override byte[] TakeBuffer(int bufferSize) return returnedBuffer; } - if (bufferSize <= this.medBufferSize) - { - this.freeMedianBuffers.TryPop(out returnedBuffer); - return returnedBuffer; - } - this.freeLargeBuffers.TryPop(out returnedBuffer); return returnedBuffer; } @@ -126,15 +108,11 @@ public override byte[] TakeBuffer(int bufferSize) /// public override void ReturnBuffer(byte[] buffer) { - if (buffer.Length <= this.smallBufferSize) + if (buffer.Length == this.smallBufferSize) { this.freeSmallBuffers.Push(buffer); } - else if (buffer.Length <= this.medBufferSize) - { - this.freeMedianBuffers.Push(buffer); - } - else + else if (buffer.Length == this.largeBufferSize) { this.freeLargeBuffers.Push(buffer); } @@ -144,7 +122,6 @@ public override void Clear() { this.buffersList = null; this.freeSmallBuffers.Clear(); - this.freeMedianBuffers.Clear(); this.freeLargeBuffers.Clear(); } } diff --git a/test/TestCases/AmqpTransportTests.cs b/test/TestCases/AmqpTransportTests.cs index 33069bff..b02eec6d 100644 --- a/test/TestCases/AmqpTransportTests.cs +++ b/test/TestCases/AmqpTransportTests.cs @@ -2,12 +2,12 @@ { using System; using System.Diagnostics; - using System.Threading; + using System.Net; using System.Net.Sockets; + using System.Threading; + using global::Microsoft.Azure.Amqp; using global::Microsoft.Azure.Amqp.Transport; using Xunit; - using System.Net; - using global::Microsoft.Azure.Amqp; [Trait("Category", TestCategory.Current)] public class AmqpTransportTests @@ -21,31 +21,31 @@ public void TcpTransportTest() { const string localHost = "localhost"; const int port = 30888; + var client = AmqpUtils.GetTcpSettings(localHost, port, true); + var server = AmqpUtils.GetTcpSettings(localHost, port, false); + this.RunTransportTest(localHost, port, client, server); + } - TransportTestContext serverContext = new TransportTestContext() - { - MaxNumber = TestMaxNumber, - TransportSettings = AmqpUtils.GetTcpSettings(localHost, port, true) - }; - - TransportTestContext clientContext = new TransportTestContext() - { - MaxNumber = TestMaxNumber, - TransportSettings = AmqpUtils.GetTcpSettings(localHost, port, false) - }; - - Thread listenerThread = new Thread(new ParameterizedThreadStart(ListenerThread)); - listenerThread.Start(serverContext); - - Thread initiatorThread = new Thread(new ParameterizedThreadStart(InitiatorThread)); - initiatorThread.Start(clientContext); - - listenerThread.Join(); - initiatorThread.Join(); + [Fact] + public void TcpTransportClientDynamicBufferTest() + { + const string localHost = "localhost"; + const int port = 30888; + var client = AmqpUtils.GetTcpSettings(localHost, port, true); + var server = AmqpUtils.GetTcpSettings(localHost, port, false); + client.SendBufferSize = client.ReceiveBufferSize = 0; + this.RunTransportTest(localHost, port, client, server); + } - Debug.WriteLine("TCP transport test completed."); - Assert.True(clientContext.Success); - Assert.True(serverContext.Success); + [Fact] + public void TcpTransportServerDynamicBufferTest() + { + const string localHost = "localhost"; + const int port = 30888; + var client = AmqpUtils.GetTcpSettings(localHost, port, true); + var server = AmqpUtils.GetTcpSettings(localHost, port, false); + server.SendBufferSize = server.ReceiveBufferSize = 0; + this.RunTransportTest(localHost, port, client, server); } [Fact] @@ -80,6 +80,34 @@ public void ConnectTimeoutTest() } } + void RunTransportTest(string host, int port, TransportSettings client, TransportSettings server) + { + TransportTestContext serverContext = new TransportTestContext() + { + MaxNumber = TestMaxNumber, + TransportSettings = client + }; + + TransportTestContext clientContext = new TransportTestContext() + { + MaxNumber = TestMaxNumber, + TransportSettings = server + }; + + Thread listenerThread = new Thread(new ParameterizedThreadStart(ListenerThread)); + listenerThread.Start(serverContext); + + Thread initiatorThread = new Thread(new ParameterizedThreadStart(InitiatorThread)); + initiatorThread.Start(clientContext); + + listenerThread.Join(); + initiatorThread.Join(); + + Debug.WriteLine("TCP transport test completed."); + Assert.True(clientContext.Success); + Assert.True(serverContext.Success); + } + internal static TransportBase AcceptServerTransport(TransportSettings settings) { ManualResetEvent complete = new ManualResetEvent(false);