Skip to content

Commit

Permalink
Update pre-allocated buffers.
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Jan 13, 2021
1 parent 66aa029 commit 3b40ad8
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 65 deletions.
4 changes: 2 additions & 2 deletions Microsoft.Azure.Amqp/Amqp/Transport/TcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ sealed class BufferSizeTracker
// unitSize is the min value to increase to help small messages to reach level 1
// level is changed only when the trend is consistent during two consecutive windows
// bufferSizes: must match the preallocated buffers in InternalBufferManager.PreallocatedBufferManager
static long durationTicks = TimeSpan.FromSeconds(4).Ticks;
static int[] thresholds = new int[] { 0, 8 * 1024, 4 * 1024 * 1024 };
static long durationTicks = TimeSpan.FromSeconds(2).Ticks;
static int[] thresholds = new int[] { 0, 4 * 1024, 2 * 1024 * 1024 };
static int[] bufferSizes = new int[] { 0, 8 * 1024, 64 * 1024 };
int unitSize;
DateTime firstOperation;
Expand Down
51 changes: 14 additions & 37 deletions Microsoft.Azure.Amqp/InternalBufferManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Amqp
using System;
using System.Collections.Generic;
using System.Threading;
using System.Runtime.InteropServices;
using System.Collections.Concurrent;

abstract class InternalBufferManager
Expand Down Expand Up @@ -49,47 +48,36 @@ public static byte[] AllocateByteArray(int size)

sealed class PreallocatedBufferManager : InternalBufferManager
{
readonly int maxBufferSize;
readonly int medBufferSize;
readonly int largeBufferSize;
readonly int smallBufferSize;
readonly ConcurrentStack<byte[]> freeSmallBuffers;
readonly ConcurrentStack<byte[]> freeMedianBuffers;
readonly ConcurrentStack<byte[]> freeLargeBuffers;
byte[][] buffersList;

internal PreallocatedBufferManager(long maxMemoryToPool)
{
// Buffer sizes are fixed.
this.maxBufferSize = 64 * 1024;
this.medBufferSize = 8 * 1024;
this.smallBufferSize = 1024;
// Buffer sizes are fixed. For default maxPool 48MB,
// 64KB (L) = 48, 8KB (S) = 5760
this.largeBufferSize = 64 * 1024;
this.smallBufferSize = 8 * 1024;

long numLargeBuffers = 128;
long numSmallBuffers = 4096;
long medMemorySize = maxMemoryToPool - (numLargeBuffers * this.maxBufferSize) - (numSmallBuffers * this.smallBufferSize);
long numMedBuffers = medMemorySize / this.medBufferSize;
long numBuffers = numLargeBuffers + numMedBuffers + numSmallBuffers;
long largeBufferMemory = maxMemoryToPool / 16;
long numLargeBuffers = largeBufferMemory / this.largeBufferSize;
long numSmallBuffers = (maxMemoryToPool - largeBufferMemory) / this.smallBufferSize;
long numBuffers = numLargeBuffers + numSmallBuffers;

this.buffersList = new byte[numBuffers][];
this.freeSmallBuffers = new ConcurrentStack<byte[]>();
this.freeMedianBuffers = new ConcurrentStack<byte[]>();
this.freeLargeBuffers = new ConcurrentStack<byte[]>();

int lastLarge = 0;
for (int i = 0; i < numLargeBuffers; i++, lastLarge++)
{
buffersList[i] = new byte[maxBufferSize];
buffersList[i] = new byte[largeBufferSize];
this.freeLargeBuffers.Push(buffersList[i]);
}

int lastMed = lastLarge;
for (int i = lastLarge; i < numMedBuffers + lastLarge; i++, lastMed++)
{
buffersList[i] = new byte[this.medBufferSize];
this.freeMedianBuffers.Push(buffersList[i]);
}

for (int i = lastMed; i < numSmallBuffers + lastMed; i++)
for (int i = lastLarge; i < numSmallBuffers + lastLarge; i++)
{
buffersList[i] = new byte[this.smallBufferSize];
this.freeSmallBuffers.Push(buffersList[i]);
Expand All @@ -98,7 +86,7 @@ internal PreallocatedBufferManager(long maxMemoryToPool)

public override byte[] TakeBuffer(int bufferSize)
{
if (bufferSize > this.maxBufferSize)
if (bufferSize > this.largeBufferSize)
{
return null;
}
Expand All @@ -110,12 +98,6 @@ public override byte[] TakeBuffer(int bufferSize)
return returnedBuffer;
}

if (bufferSize <= this.medBufferSize)
{
this.freeMedianBuffers.TryPop(out returnedBuffer);
return returnedBuffer;
}

this.freeLargeBuffers.TryPop(out returnedBuffer);
return returnedBuffer;
}
Expand All @@ -126,15 +108,11 @@ public override byte[] TakeBuffer(int bufferSize)
/// <param name="buffer"></param>
public override void ReturnBuffer(byte[] buffer)
{
if (buffer.Length <= this.smallBufferSize)
if (buffer.Length == this.smallBufferSize)
{
this.freeSmallBuffers.Push(buffer);
}
else if (buffer.Length <= this.medBufferSize)
{
this.freeMedianBuffers.Push(buffer);
}
else
else if (buffer.Length == this.largeBufferSize)
{
this.freeLargeBuffers.Push(buffer);
}
Expand All @@ -144,7 +122,6 @@ public override void Clear()
{
this.buffersList = null;
this.freeSmallBuffers.Clear();
this.freeMedianBuffers.Clear();
this.freeLargeBuffers.Clear();
}
}
Expand Down
80 changes: 54 additions & 26 deletions test/TestCases/AmqpTransportTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using global::Microsoft.Azure.Amqp;
using global::Microsoft.Azure.Amqp.Transport;
using Xunit;
using System.Net;
using global::Microsoft.Azure.Amqp;

[Trait("Category", TestCategory.Current)]
public class AmqpTransportTests
Expand All @@ -21,31 +21,31 @@ public void TcpTransportTest()
{
const string localHost = "localhost";
const int port = 30888;
var client = AmqpUtils.GetTcpSettings(localHost, port, true);
var server = AmqpUtils.GetTcpSettings(localHost, port, false);
this.RunTransportTest(localHost, port, client, server);
}

TransportTestContext serverContext = new TransportTestContext()
{
MaxNumber = TestMaxNumber,
TransportSettings = AmqpUtils.GetTcpSettings(localHost, port, true)
};

TransportTestContext clientContext = new TransportTestContext()
{
MaxNumber = TestMaxNumber,
TransportSettings = AmqpUtils.GetTcpSettings(localHost, port, false)
};

Thread listenerThread = new Thread(new ParameterizedThreadStart(ListenerThread));
listenerThread.Start(serverContext);

Thread initiatorThread = new Thread(new ParameterizedThreadStart(InitiatorThread));
initiatorThread.Start(clientContext);

listenerThread.Join();
initiatorThread.Join();
[Fact]
public void TcpTransportClientDynamicBufferTest()
{
const string localHost = "localhost";
const int port = 30888;
var client = AmqpUtils.GetTcpSettings(localHost, port, true);
var server = AmqpUtils.GetTcpSettings(localHost, port, false);
client.SendBufferSize = client.ReceiveBufferSize = 0;
this.RunTransportTest(localHost, port, client, server);
}

Debug.WriteLine("TCP transport test completed.");
Assert.True(clientContext.Success);
Assert.True(serverContext.Success);
[Fact]
public void TcpTransportServerDynamicBufferTest()
{
const string localHost = "localhost";
const int port = 30888;
var client = AmqpUtils.GetTcpSettings(localHost, port, true);
var server = AmqpUtils.GetTcpSettings(localHost, port, false);
server.SendBufferSize = server.ReceiveBufferSize = 0;
this.RunTransportTest(localHost, port, client, server);
}

[Fact]
Expand Down Expand Up @@ -80,6 +80,34 @@ public void ConnectTimeoutTest()
}
}

void RunTransportTest(string host, int port, TransportSettings client, TransportSettings server)
{
TransportTestContext serverContext = new TransportTestContext()
{
MaxNumber = TestMaxNumber,
TransportSettings = client
};

TransportTestContext clientContext = new TransportTestContext()
{
MaxNumber = TestMaxNumber,
TransportSettings = server
};

Thread listenerThread = new Thread(new ParameterizedThreadStart(ListenerThread));
listenerThread.Start(serverContext);

Thread initiatorThread = new Thread(new ParameterizedThreadStart(InitiatorThread));
initiatorThread.Start(clientContext);

listenerThread.Join();
initiatorThread.Join();

Debug.WriteLine("TCP transport test completed.");
Assert.True(clientContext.Success);
Assert.True(serverContext.Success);
}

internal static TransportBase AcceptServerTransport(TransportSettings settings)
{
ManualResetEvent complete = new ManualResetEvent(false);
Expand Down

0 comments on commit 3b40ad8

Please sign in to comment.