Skip to content

Commit

Permalink
Support link operation timeout in drain operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Jan 11, 2023
1 parent 1eb4e38 commit 9d29d1c
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 40 deletions.
113 changes: 74 additions & 39 deletions Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public sealed class ReceivingAmqpLink : AmqpLink
WorkCollection<ArraySegment<byte>, DisposeAsyncResult, DeliveryState> pendingDispositions;
AmqpMessage currentMessage;
LinkedList<ReceiveAsyncResult> waiterList;
HashSet<DrainTaskCompletionSource> drainTasks;
HashSet<DrainAsyncResult> drainTasks;

public ReceivingAmqpLink(AmqpLinkSettings settings)
: this(null, settings)
Expand Down Expand Up @@ -273,27 +273,12 @@ public bool EndReceiveMessages(IAsyncResult result, out IEnumerable<AmqpMessage>

public Task DrainAsyc(CancellationToken cancellationToken)
{
var tcs = new DrainTaskCompletionSource(this);
lock (this.SyncRoot)
{
if (this.drainTasks == null)
{
this.drainTasks = new HashSet<DrainTaskCompletionSource>();
}

this.drainTasks.Add(tcs);
if (!this.Drain)
{
this.SendFlow(false, true, null);
}

if (cancellationToken.CanBeCanceled)
{
cancellationToken.Register(s => ((DrainTaskCompletionSource)s).Cancel(), tcs, false);
}
}

return tcs.Task;
return Task.Factory.FromAsync(
(thisPtr, k, c, s) => new DrainAsyncResult(thisPtr, thisPtr.OperationTimeout, k, c, s),
r => DrainAsyncResult.End(r),
this,
cancellationToken,
this);
}

public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, Outcome outcome, bool batchable, TimeSpan timeout)
Expand Down Expand Up @@ -515,7 +500,7 @@ protected override void OnReceiveFlow(Flow flow)
base.OnReceiveFlow(flow);
if (draining && this.LinkCredit == 0)
{
HashSet<DrainTaskCompletionSource> pendingTasks = null;
HashSet<DrainAsyncResult> pendingTasks = null;
lock (this.SyncRoot)
{
pendingTasks = this.drainTasks;
Expand All @@ -526,7 +511,7 @@ protected override void OnReceiveFlow(Flow flow)
{
foreach (var task in pendingTasks)
{
task.TrySetResult(true);
task.Signal(false);
}
}
}
Expand Down Expand Up @@ -898,37 +883,87 @@ private static void RemoveFromWaiterList(ReceiveAsyncResult result)
}
}

sealed class DrainTaskCompletionSource : TaskCompletionSource<bool>
sealed class DrainAsyncResult : TimeoutAsyncResult<string>
{
readonly ReceivingAmqpLink link;

public DrainTaskCompletionSource(ReceivingAmqpLink link)
public DrainAsyncResult(ReceivingAmqpLink link,
TimeSpan timeout,
CancellationToken cancellationToken,
AsyncCallback callback,
object state)
: base(timeout, cancellationToken, callback, state)
{
this.link = link;
this.Start();
}

public void Cancel()
public static void End(IAsyncResult result)
{
AsyncResult.End<DrainAsyncResult>(result);
}

public void Signal(bool isSynchronous)
{
this.CompleteSelf(isSynchronous);
}

public override void Cancel(bool isSynchronous)
{
if (this.Remove())
{
this.CompleteSelf(isSynchronous, new TaskCanceledException());
}
}

protected override string Target
{
get { return "drain"; }
}

protected override void CompleteOnTimer()
{
if (this.Remove())
{
base.CompleteOnTimer();
}
}

void Start()
{
bool setCancel = false;
lock (this.link.SyncRoot)
{
if (this.link.drainTasks != null)
if (this.link.drainTasks == null)
{
if (this.link.drainTasks.Remove(this))
{
setCancel = true;
if (this.link.drainTasks.Count == 0)
{
this.link.drainTasks = null;
}
}
this.link.drainTasks = new HashSet<DrainAsyncResult>();
}

this.link.drainTasks.Add(this);
if (!this.link.Drain)
{
this.link.SendFlow(false, true, null);
}
}

if (setCancel)
this.StartTracking();
}

bool Remove()
{
lock (this.link.SyncRoot)
{
this.TrySetCanceled();
if (this.link.drainTasks == null || !this.link.drainTasks.Remove(this))
{
return false;
}

if (this.link.drainTasks.Count == 0)
{
this.link.drainTasks = null;
}
}

return true;
}
}

Expand Down
91 changes: 90 additions & 1 deletion test/TestCases/CancellationTokenTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,85 @@ await Assert.ThrowsAnyAsync<TaskCanceledException>(async () =>
}
}

[Fact]
public Task LinkDrainTest()
{
return this.RunLinkDrainTest(false);
}

[Fact]
public Task LinkDrainCanceledTest()
{
return this.RunLinkDrainTest(true);
}

[Fact]
public Task LinkDrainTimeoutTest()
{
return this.RunLinkDrainTest(false, 800);
}

async Task RunLinkDrainTest(bool cancelBefore, int timeoutMilliseconds = 0)
{
var provider = new TestRuntimeProvider()
{
LinkFactory = (s, t) => { t.SettleType = SettleMode.SettleOnDispose; return new TestLink(s, t, flowHang: true); }
};
AmqpConnectionListener listener = new AmqpConnectionListener(addressUri.AbsoluteUri, provider);
listener.Open();

try
{
try
{
var factory = new AmqpConnectionFactory();
var connection = await factory.OpenConnectionAsync(this.addressUri, AmqpConstants.DefaultTimeout);
var session = connection.CreateSession(new AmqpSessionSettings());
await session.OpenAsync(AmqpConstants.DefaultTimeout);

var link = new ReceivingAmqpLink(session, new AmqpLinkSettings() { Role = true, LinkName = "receiver", TotalLinkCredit = 0, Source = new Source(), Target = new Target() });
await link.OpenAsync(AmqpConstants.DefaultTimeout);

var cts = new CancellationTokenSource();
if (cancelBefore && timeoutMilliseconds == 0)
{
cts.Cancel();
}

if (timeoutMilliseconds > 0)
{
link.Settings.OperationTimeout = TimeSpan.FromMilliseconds(timeoutMilliseconds);
}

var task = link.DrainAsyc(cts.Token);
if (!cancelBefore && timeoutMilliseconds == 0)
{
await Task.Yield();
cts.Cancel();
}

await task;

Assert.True(false, "Exception not thrown");
}
catch (Exception exception)
{
if (timeoutMilliseconds > 0)
{
Assert.Equal(typeof(TimeoutException), exception.GetType());
}
else
{
Assert.Equal(typeof(TaskCanceledException), exception.GetType());
}
}
}
finally
{
listener.Close();
}
}

[Fact]
public async Task CbsSendTokenNoCancelTest()
{
Expand Down Expand Up @@ -714,16 +793,18 @@ class TestLink : AmqpLink
readonly bool sendHang;
readonly bool receiveHang;
readonly bool disposeHang;
readonly bool flowHang;

public TestLink(AmqpSession session, AmqpLinkSettings settings, bool openHang = false, bool closeHang = false,
bool sendHang = false, bool receiveHang = false, bool disposeHang = false)
bool sendHang = false, bool receiveHang = false, bool disposeHang = false, bool flowHang = false)
: base(session, settings)
{
this.openHang = openHang;
this.closeHang = closeHang;
this.sendHang = sendHang;
this.receiveHang = receiveHang;
this.disposeHang = disposeHang;
this.flowHang = flowHang;
}

public override bool CreateDelivery(Transfer transfer, out Delivery delivery)
Expand Down Expand Up @@ -768,6 +849,14 @@ protected override void OnDisposeDeliveryInternal(Delivery delivery)
this.DisposeDelivery(delivery, true, AmqpConstants.AcceptedOutcome);
}
}

protected override void OnReceiveFlow(Flow flow)
{
if (!this.flowHang)
{
base.OnReceiveFlow(flow);
}
}
}
}
}

0 comments on commit 9d29d1c

Please sign in to comment.