Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Azure/amqpnetlite into re…
Browse files Browse the repository at this point in the history
…lease-1.0
  • Loading branch information
xinchen10 committed Mar 7, 2016
2 parents 162d6d7 + 4ef0439 commit 485aad5
Show file tree
Hide file tree
Showing 21 changed files with 495 additions and 138 deletions.
4 changes: 4 additions & 0 deletions build.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ IF /I "%1" EQU "clean" (
GOTO :args-done
)

IF /I "%1" EQU "test" (
GOTO :build-done
)

:args-start
IF /I "%1" EQU "" GOTO args-done

Expand Down
2 changes: 1 addition & 1 deletion dnx/Amqp.DotNet/project.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "1.1.5",
"version": "1.1.7",
"description": "Amqp 1.0 Class Library for .NETFramework, .NETPlatform, and WinRT",
"authors": [ "xinchen" ],
"tags": [ "AMQP Micro-Framework netmf netcf iot winrt coreclr corefx .NetPlatform .NetCore UWP" ],
Expand Down
7 changes: 6 additions & 1 deletion src/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum State

internal const uint DefaultMaxFrameSize = 256 * 1024;
internal const ushort DefaultMaxSessions = 256;
internal const int DefaultMaxLinksPerSession = 64;
const uint MaxIdleTimeout = 30 * 60 * 1000;
static readonly TimerCallback onHeartBeatTimer = OnHeartBeatTimer;
readonly Address address;
Expand Down Expand Up @@ -133,9 +134,9 @@ internal Connection(IBufferManager bufferManager, AmqpSettings amqpSettings, Add
: this((ushort)(amqpSettings.MaxSessionsPerConnection - 1), (uint)amqpSettings.MaxFrameSize)
{
this.BufferManager = bufferManager;
this.MaxLinksPerSession = amqpSettings.MaxLinksPerSession;
this.address = address;
this.onOpened = onOpened;
this.maxFrameSize = (uint)amqpSettings.MaxFrameSize;
this.transport = transport;
transport.SetConnection(this);

Expand Down Expand Up @@ -170,6 +171,8 @@ internal IBufferManager BufferManager
private set;
}

internal int MaxLinksPerSession;

ByteBuffer AllocateBuffer(int size)
{
return this.BufferManager.GetByteBuffer(size);
Expand All @@ -180,6 +183,8 @@ ByteBuffer WrapBuffer(ByteBuffer buffer, int offset, int length)
return new WrappedByteBuffer(buffer, offset, length);
}
#else
internal int MaxLinksPerSession = DefaultMaxLinksPerSession;

ByteBuffer AllocateBuffer(int size)
{
return new ByteBuffer(size, true);
Expand Down
1 change: 1 addition & 0 deletions src/Framing/AmqpValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ internal T GetValue<T>()
}
else
{
this.valueBuffer.Seek(0);
return Serialization.AmqpSerializer.Deserialize<T>(this.valueBuffer);
}
}
Expand Down
35 changes: 34 additions & 1 deletion src/Framing/Begin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,61 +19,94 @@ namespace Amqp.Framing
{
using Amqp.Types;

sealed class Begin : DescribedList
/// <summary>
/// The Begin class contains parameters to begin a session in a connection.
/// </summary>
public sealed class Begin : DescribedList
{
/// <summary>
/// Initializes a Begin object.
/// </summary>
public Begin()
: base(Codec.Begin, 8)
{
}

/// <summary>
/// Gets or sets the remote-channel field.
/// </summary>
public ushort RemoteChannel
{
get { return this.Fields[0] == null ? ushort.MaxValue : (ushort)this.Fields[0]; }
set { this.Fields[0] = value; }
}

/// <summary>
/// Gets or sets the next-outgoing-id field.
/// </summary>
public uint NextOutgoingId
{
get { return this.Fields[1] == null ? uint.MinValue : (uint)this.Fields[1]; }
set { this.Fields[1] = value; }
}

/// <summary>
/// Gets or sets the incoming-window field.
/// </summary>
public uint IncomingWindow
{
get { return this.Fields[2] == null ? uint.MaxValue : (uint)this.Fields[2]; }
set { this.Fields[2] = value; }
}

/// <summary>
/// Gets or sets the outgoing-window field.
/// </summary>
public uint OutgoingWindow
{
get { return this.Fields[3] == null ? uint.MaxValue : (uint)this.Fields[3]; }
set { this.Fields[3] = value; }
}

/// <summary>
/// Gets or sets the handle-max field.
/// </summary>
public uint HandleMax
{
get { return this.Fields[4] == null ? uint.MaxValue : (uint)this.Fields[4]; }
set { this.Fields[4] = value; }
}

/// <summary>
/// Gets or sets the offered-capabilities field.
/// </summary>
public Symbol[] OfferedCapabilities
{
get { return Codec.GetSymbolMultiple(this.Fields, 5); }
set { this.Fields[5] = value; }
}

/// <summary>
/// Gets or sets the desired-capabilities field.
/// </summary>
public Symbol[] DesiredCapabilities
{
get { return Codec.GetSymbolMultiple(this.Fields, 6); }
set { this.Fields[6] = value; }
}

/// <summary>
/// Gets or sets the properties field.
/// </summary>
public Fields Properties
{
get { return Amqp.Types.Fields.From(this.Fields, 7); }
set { this.Fields[7] = value; }
}

/// <summary>
/// Returns a string that represents the current begin object.
/// </summary>
public override string ToString()
{
#if TRACE
Expand Down
4 changes: 4 additions & 0 deletions src/Listener/ContainerHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ bool IContainer.AttachLink(ListenerConnection connection, ListenerSession sessio
}

string address = attach.Role ? ((Source)attach.Source).Address : ((Target)attach.Target).Address;
if (string.IsNullOrWhiteSpace(address))
{
throw new AmqpException(ErrorCode.InvalidField, "The address field cannot be empty");
}

MessageProcessor messageProcessor;
if (TryGetProcessor(this.messageProcessors, address, out messageProcessor))
Expand Down
18 changes: 16 additions & 2 deletions src/Listener/ListenerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Amqp.Listener
{
using System;
using System.Threading;
using Amqp.Framing;

Expand Down Expand Up @@ -48,8 +49,21 @@ internal string RemoteContainerId
internal override void OnBegin(ushort remoteChannel, Begin begin)
{
// this sends a begin to the remote peer
begin.RemoteChannel = remoteChannel;
var session = new ListenerSession(this, begin);
Begin local = new Begin()
{
RemoteChannel = remoteChannel,
IncomingWindow = Session.defaultWindowSize,
OutgoingWindow = begin.IncomingWindow,
NextOutgoingId = 0,
HandleMax = (uint)(this.listener.AMQP.MaxLinksPerSession - 1)
};

if (begin.HandleMax < local.HandleMax)
{
local.HandleMax = begin.HandleMax;
}

var session = new ListenerSession(this, local);

// this updates the local session state
begin.RemoteChannel = session.Channel;
Expand Down
2 changes: 1 addition & 1 deletion src/Listener/ListenerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace Amqp.Listener
public class ListenerSession : Session
{
internal ListenerSession(ListenerConnection connection, Begin begin)
: base(connection, begin)
: base(connection, begin, null)
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public object Body
}
}

#if (NETFX || DOTNET || NETFX35)
#if (NETFX || DOTNET || NETFX35 || NETFX40)
/// <summary>
/// Gets an object of type T from the message body.
/// </summary>
Expand Down
11 changes: 10 additions & 1 deletion src/Net/AmqpSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,23 @@ public string HostName
}

/// <summary>
/// Gets or sets the open.channel-max field.
/// Gets or sets the open.channel-max field (less by one).
/// </summary>
public ushort MaxSessionsPerConnection
{
get;
set;
}

/// <summary>
/// Gets or sets the begin.handle-max field (less by one).
/// </summary>
public int MaxLinksPerSession
{
get;
set;
}

/// <summary>
/// Gets or sets the open.idle-time-out field.
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions src/Net/ConnectionFactoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ protected ConnectionFactoryBase()
this.amqpSettings = new AmqpSettings()
{
MaxFrameSize = (int)Connection.DefaultMaxFrameSize,
ContainerId = "AMQPLite-" + Guid.NewGuid().ToString("N"),
ContainerId = "AMQPNetLite-" + Guid.NewGuid().ToString("N").Substring(0, 8),
IdleTimeout = int.MaxValue,
MaxSessionsPerConnection = 8
MaxSessionsPerConnection = Connection.DefaultMaxSessions,
MaxLinksPerSession = Connection.DefaultMaxLinksPerSession
};
}

Expand Down
10 changes: 10 additions & 0 deletions src/Net/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ internal static bool IsValueType(this Type type)
return type.IsValueType;
}

internal static bool IsEnum(this Type type)
{
return type.IsEnum;
}

internal static bool IsGenericType(this Type type)
{
return type.IsGenericType;
Expand Down Expand Up @@ -81,6 +86,11 @@ internal static bool IsValueType(this Type type)
return type.GetTypeInfo().IsValueType;
}

internal static bool IsEnum(this Type type)
{
return type.GetTypeInfo().IsEnum;
}

internal static bool IsGenericType(this Type type)
{
return type.GetTypeInfo().IsGenericType;
Expand Down
4 changes: 2 additions & 2 deletions src/Properties/Version.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
// Build Number
// Revision
//
[assembly: AssemblyVersion("1.1.6")]
[assembly: AssemblyFileVersion("1.1.6")]
[assembly: AssemblyVersion("1.1.7")]
[assembly: AssemblyFileVersion("1.1.7")]
17 changes: 13 additions & 4 deletions src/SenderLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,20 @@ void WriteDelivery(Delivery delivery)
while (delivery != null)
{
delivery.Handle = this.Handle;
bool settled = delivery.Settled;
this.Session.SendDelivery(delivery);
if (settled && delivery.OnOutcome != null)

try
{
bool settled = delivery.Settled;
this.Session.SendDelivery(delivery);
if (settled && delivery.OnOutcome != null)
{
delivery.OnOutcome(delivery.Message, new Accepted(), delivery.UserToken);
}
}
catch
{
delivery.OnOutcome(delivery.Message, new Accepted(), delivery.UserToken);
this.writing = false;
throw;
}

lock (this.ThisLock)
Expand Down
37 changes: 22 additions & 15 deletions src/Serialization/AmqpSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,34 @@ SerializableType CompileNonContractTypes(Type type)
return SerializableType.CreateObjectType(type);
}

if (typeof(Described).IsAssignableFrom(type))
{
return SerializableType.CreateObjectType(type);
}

if (typeof(IAmqpSerializable).IsAssignableFrom(type))
{
return SerializableType.CreateAmqpSerializableType(this, type);
}

if (typeof(Described).IsAssignableFrom(type))
if (type.IsGenericType() && type.GetGenericTypeDefinition() == typeof(Nullable<>))
{
return SerializableType.CreateAmqpDescribedType(this, type);
Type[] argTypes = type.GetGenericArguments();
Fx.Assert(argTypes.Length == 1, "Nullable type must have one argument");
Type argType = argTypes[0];
if (argType.IsEnum())
{
return CompileEnumType(argType);
}
else
{
return SerializableType.CreateObjectType(type);
}
}

SerializableType nullable = this.CompileNullableTypes(type);
if (nullable != null)
if (type.IsEnum())
{
return nullable;
return CompileEnumType(type);
}

SerializableType collection = this.CompileCollectionTypes(type);
Expand All @@ -366,17 +380,10 @@ SerializableType CompileNonContractTypes(Type type)
return null;
}

SerializableType CompileNullableTypes(Type type)
SerializableType CompileEnumType(Type type)
{
if (type.IsGenericType() &&
type.GetGenericTypeDefinition() == typeof(Nullable<>))
{
Type[] argTypes = type.GetGenericArguments();
Fx.Assert(argTypes.Length == 1, "Nullable type must have one argument");
return SerializableType.CreateNullableType(type, this.GetType(argTypes[0]));
}

return null;
SerializableType underlyingType = GetType(Enum.GetUnderlyingType(type));
return SerializableType.CreateEnumType(type, underlyingType);
}

SerializableType CompileCollectionTypes(Type type)
Expand Down
Loading

0 comments on commit 485aad5

Please sign in to comment.