diff --git a/build.cmd b/build.cmd index 04603536..9d154d60 100644 --- a/build.cmd +++ b/build.cmd @@ -36,6 +36,10 @@ IF /I "%1" EQU "clean" ( GOTO :args-done ) +IF /I "%1" EQU "test" ( + GOTO :build-done +) + :args-start IF /I "%1" EQU "" GOTO args-done diff --git a/dnx/Amqp.DotNet/project.json b/dnx/Amqp.DotNet/project.json index 7163040f..570fb8f8 100644 --- a/dnx/Amqp.DotNet/project.json +++ b/dnx/Amqp.DotNet/project.json @@ -1,5 +1,5 @@ { - "version": "1.1.5", + "version": "1.1.7", "description": "Amqp 1.0 Class Library for .NETFramework, .NETPlatform, and WinRT", "authors": [ "xinchen" ], "tags": [ "AMQP Micro-Framework netmf netcf iot winrt coreclr corefx .NetPlatform .NetCore UWP" ], diff --git a/src/Connection.cs b/src/Connection.cs index a4f7f245..dc9223b3 100644 --- a/src/Connection.cs +++ b/src/Connection.cs @@ -59,6 +59,7 @@ enum State internal const uint DefaultMaxFrameSize = 256 * 1024; internal const ushort DefaultMaxSessions = 256; + internal const int DefaultMaxLinksPerSession = 64; const uint MaxIdleTimeout = 30 * 60 * 1000; static readonly TimerCallback onHeartBeatTimer = OnHeartBeatTimer; readonly Address address; @@ -133,9 +134,9 @@ internal Connection(IBufferManager bufferManager, AmqpSettings amqpSettings, Add : this((ushort)(amqpSettings.MaxSessionsPerConnection - 1), (uint)amqpSettings.MaxFrameSize) { this.BufferManager = bufferManager; + this.MaxLinksPerSession = amqpSettings.MaxLinksPerSession; this.address = address; this.onOpened = onOpened; - this.maxFrameSize = (uint)amqpSettings.MaxFrameSize; this.transport = transport; transport.SetConnection(this); @@ -170,6 +171,8 @@ internal IBufferManager BufferManager private set; } + internal int MaxLinksPerSession; + ByteBuffer AllocateBuffer(int size) { return this.BufferManager.GetByteBuffer(size); @@ -180,6 +183,8 @@ ByteBuffer WrapBuffer(ByteBuffer buffer, int offset, int length) return new WrappedByteBuffer(buffer, offset, length); } #else + internal int MaxLinksPerSession = DefaultMaxLinksPerSession; + ByteBuffer AllocateBuffer(int size) { return new ByteBuffer(size, true); diff --git a/src/Framing/AmqpValue.cs b/src/Framing/AmqpValue.cs index e59affd3..56ae9c4b 100644 --- a/src/Framing/AmqpValue.cs +++ b/src/Framing/AmqpValue.cs @@ -92,6 +92,7 @@ internal T GetValue() } else { + this.valueBuffer.Seek(0); return Serialization.AmqpSerializer.Deserialize(this.valueBuffer); } } diff --git a/src/Framing/Begin.cs b/src/Framing/Begin.cs index fcb3e118..6c76ca72 100644 --- a/src/Framing/Begin.cs +++ b/src/Framing/Begin.cs @@ -19,61 +19,94 @@ namespace Amqp.Framing { using Amqp.Types; - sealed class Begin : DescribedList + /// + /// The Begin class contains parameters to begin a session in a connection. + /// + public sealed class Begin : DescribedList { + /// + /// Initializes a Begin object. + /// public Begin() : base(Codec.Begin, 8) { } + /// + /// Gets or sets the remote-channel field. + /// public ushort RemoteChannel { get { return this.Fields[0] == null ? ushort.MaxValue : (ushort)this.Fields[0]; } set { this.Fields[0] = value; } } + /// + /// Gets or sets the next-outgoing-id field. + /// public uint NextOutgoingId { get { return this.Fields[1] == null ? uint.MinValue : (uint)this.Fields[1]; } set { this.Fields[1] = value; } } + /// + /// Gets or sets the incoming-window field. + /// public uint IncomingWindow { get { return this.Fields[2] == null ? uint.MaxValue : (uint)this.Fields[2]; } set { this.Fields[2] = value; } } + /// + /// Gets or sets the outgoing-window field. + /// public uint OutgoingWindow { get { return this.Fields[3] == null ? uint.MaxValue : (uint)this.Fields[3]; } set { this.Fields[3] = value; } } + /// + /// Gets or sets the handle-max field. + /// public uint HandleMax { get { return this.Fields[4] == null ? uint.MaxValue : (uint)this.Fields[4]; } set { this.Fields[4] = value; } } + /// + /// Gets or sets the offered-capabilities field. + /// public Symbol[] OfferedCapabilities { get { return Codec.GetSymbolMultiple(this.Fields, 5); } set { this.Fields[5] = value; } } + /// + /// Gets or sets the desired-capabilities field. + /// public Symbol[] DesiredCapabilities { get { return Codec.GetSymbolMultiple(this.Fields, 6); } set { this.Fields[6] = value; } } + /// + /// Gets or sets the properties field. + /// public Fields Properties { get { return Amqp.Types.Fields.From(this.Fields, 7); } set { this.Fields[7] = value; } } + /// + /// Returns a string that represents the current begin object. + /// public override string ToString() { #if TRACE diff --git a/src/Listener/ContainerHost.cs b/src/Listener/ContainerHost.cs index b222a230..27444d5d 100644 --- a/src/Listener/ContainerHost.cs +++ b/src/Listener/ContainerHost.cs @@ -243,6 +243,10 @@ bool IContainer.AttachLink(ListenerConnection connection, ListenerSession sessio } string address = attach.Role ? ((Source)attach.Source).Address : ((Target)attach.Target).Address; + if (string.IsNullOrWhiteSpace(address)) + { + throw new AmqpException(ErrorCode.InvalidField, "The address field cannot be empty"); + } MessageProcessor messageProcessor; if (TryGetProcessor(this.messageProcessors, address, out messageProcessor)) diff --git a/src/Listener/ListenerConnection.cs b/src/Listener/ListenerConnection.cs index 09cbcc7e..3d3031d3 100644 --- a/src/Listener/ListenerConnection.cs +++ b/src/Listener/ListenerConnection.cs @@ -17,6 +17,7 @@ namespace Amqp.Listener { + using System; using System.Threading; using Amqp.Framing; @@ -48,8 +49,21 @@ internal string RemoteContainerId internal override void OnBegin(ushort remoteChannel, Begin begin) { // this sends a begin to the remote peer - begin.RemoteChannel = remoteChannel; - var session = new ListenerSession(this, begin); + Begin local = new Begin() + { + RemoteChannel = remoteChannel, + IncomingWindow = Session.defaultWindowSize, + OutgoingWindow = begin.IncomingWindow, + NextOutgoingId = 0, + HandleMax = (uint)(this.listener.AMQP.MaxLinksPerSession - 1) + }; + + if (begin.HandleMax < local.HandleMax) + { + local.HandleMax = begin.HandleMax; + } + + var session = new ListenerSession(this, local); // this updates the local session state begin.RemoteChannel = session.Channel; diff --git a/src/Listener/ListenerSession.cs b/src/Listener/ListenerSession.cs index bc003b24..e8c1f949 100644 --- a/src/Listener/ListenerSession.cs +++ b/src/Listener/ListenerSession.cs @@ -27,7 +27,7 @@ namespace Amqp.Listener public class ListenerSession : Session { internal ListenerSession(ListenerConnection connection, Begin begin) - : base(connection, begin) + : base(connection, begin, null) { } diff --git a/src/Message.cs b/src/Message.cs index 4fce46ea..49b06d8d 100644 --- a/src/Message.cs +++ b/src/Message.cs @@ -108,7 +108,7 @@ public object Body } } -#if (NETFX || DOTNET || NETFX35) +#if (NETFX || DOTNET || NETFX35 || NETFX40) /// /// Gets an object of type T from the message body. /// diff --git a/src/Net/AmqpSettings.cs b/src/Net/AmqpSettings.cs index cb8c9dc0..6066fced 100644 --- a/src/Net/AmqpSettings.cs +++ b/src/Net/AmqpSettings.cs @@ -50,7 +50,7 @@ public string HostName } /// - /// Gets or sets the open.channel-max field. + /// Gets or sets the open.channel-max field (less by one). /// public ushort MaxSessionsPerConnection { @@ -58,6 +58,15 @@ public ushort MaxSessionsPerConnection set; } + /// + /// Gets or sets the begin.handle-max field (less by one). + /// + public int MaxLinksPerSession + { + get; + set; + } + /// /// Gets or sets the open.idle-time-out field. /// diff --git a/src/Net/ConnectionFactoryBase.cs b/src/Net/ConnectionFactoryBase.cs index 81412f09..59d4f84a 100644 --- a/src/Net/ConnectionFactoryBase.cs +++ b/src/Net/ConnectionFactoryBase.cs @@ -41,9 +41,10 @@ protected ConnectionFactoryBase() this.amqpSettings = new AmqpSettings() { MaxFrameSize = (int)Connection.DefaultMaxFrameSize, - ContainerId = "AMQPLite-" + Guid.NewGuid().ToString("N"), + ContainerId = "AMQPNetLite-" + Guid.NewGuid().ToString("N").Substring(0, 8), IdleTimeout = int.MaxValue, - MaxSessionsPerConnection = 8 + MaxSessionsPerConnection = Connection.DefaultMaxSessions, + MaxLinksPerSession = Connection.DefaultMaxLinksPerSession }; } diff --git a/src/Net/TypeExtensions.cs b/src/Net/TypeExtensions.cs index e8ed002e..c6014298 100644 --- a/src/Net/TypeExtensions.cs +++ b/src/Net/TypeExtensions.cs @@ -40,6 +40,11 @@ internal static bool IsValueType(this Type type) return type.IsValueType; } + internal static bool IsEnum(this Type type) + { + return type.IsEnum; + } + internal static bool IsGenericType(this Type type) { return type.IsGenericType; @@ -81,6 +86,11 @@ internal static bool IsValueType(this Type type) return type.GetTypeInfo().IsValueType; } + internal static bool IsEnum(this Type type) + { + return type.GetTypeInfo().IsEnum; + } + internal static bool IsGenericType(this Type type) { return type.GetTypeInfo().IsGenericType; diff --git a/src/Properties/Version.cs b/src/Properties/Version.cs index f5cbfdb7..d5c494c6 100644 --- a/src/Properties/Version.cs +++ b/src/Properties/Version.cs @@ -24,5 +24,5 @@ // Build Number // Revision // -[assembly: AssemblyVersion("1.1.6")] -[assembly: AssemblyFileVersion("1.1.6")] +[assembly: AssemblyVersion("1.1.7")] +[assembly: AssemblyFileVersion("1.1.7")] diff --git a/src/SenderLink.cs b/src/SenderLink.cs index eadf910b..d8b853db 100644 --- a/src/SenderLink.cs +++ b/src/SenderLink.cs @@ -287,11 +287,20 @@ void WriteDelivery(Delivery delivery) while (delivery != null) { delivery.Handle = this.Handle; - bool settled = delivery.Settled; - this.Session.SendDelivery(delivery); - if (settled && delivery.OnOutcome != null) + + try + { + bool settled = delivery.Settled; + this.Session.SendDelivery(delivery); + if (settled && delivery.OnOutcome != null) + { + delivery.OnOutcome(delivery.Message, new Accepted(), delivery.UserToken); + } + } + catch { - delivery.OnOutcome(delivery.Message, new Accepted(), delivery.UserToken); + this.writing = false; + throw; } lock (this.ThisLock) diff --git a/src/Serialization/AmqpSerializer.cs b/src/Serialization/AmqpSerializer.cs index 409852fc..a09c2b0e 100644 --- a/src/Serialization/AmqpSerializer.cs +++ b/src/Serialization/AmqpSerializer.cs @@ -341,20 +341,34 @@ SerializableType CompileNonContractTypes(Type type) return SerializableType.CreateObjectType(type); } + if (typeof(Described).IsAssignableFrom(type)) + { + return SerializableType.CreateObjectType(type); + } + if (typeof(IAmqpSerializable).IsAssignableFrom(type)) { return SerializableType.CreateAmqpSerializableType(this, type); } - if (typeof(Described).IsAssignableFrom(type)) + if (type.IsGenericType() && type.GetGenericTypeDefinition() == typeof(Nullable<>)) { - return SerializableType.CreateAmqpDescribedType(this, type); + Type[] argTypes = type.GetGenericArguments(); + Fx.Assert(argTypes.Length == 1, "Nullable type must have one argument"); + Type argType = argTypes[0]; + if (argType.IsEnum()) + { + return CompileEnumType(argType); + } + else + { + return SerializableType.CreateObjectType(type); + } } - SerializableType nullable = this.CompileNullableTypes(type); - if (nullable != null) + if (type.IsEnum()) { - return nullable; + return CompileEnumType(type); } SerializableType collection = this.CompileCollectionTypes(type); @@ -366,17 +380,10 @@ SerializableType CompileNonContractTypes(Type type) return null; } - SerializableType CompileNullableTypes(Type type) + SerializableType CompileEnumType(Type type) { - if (type.IsGenericType() && - type.GetGenericTypeDefinition() == typeof(Nullable<>)) - { - Type[] argTypes = type.GetGenericArguments(); - Fx.Assert(argTypes.Length == 1, "Nullable type must have one argument"); - return SerializableType.CreateNullableType(type, this.GetType(argTypes[0])); - } - - return null; + SerializableType underlyingType = GetType(Enum.GetUnderlyingType(type)); + return SerializableType.CreateEnumType(type, underlyingType); } SerializableType CompileCollectionTypes(Type type) diff --git a/src/Serialization/SerializableType.cs b/src/Serialization/SerializableType.cs index 3c5d61f4..108cfd7f 100644 --- a/src/Serialization/SerializableType.cs +++ b/src/Serialization/SerializableType.cs @@ -73,9 +73,9 @@ public static SerializableType CreateObjectType(Type type) return new AmqpObjectType(type); } - public static SerializableType CreateNullableType(Type type, SerializableType argType) + public static SerializableType CreateEnumType(Type type, SerializableType underlyingType) { - return new NullableType(type, argType); + return new EnumType(type, underlyingType); } public static SerializableType CreateAmqpSerializableType(AmqpSerializer serializer, Type type) @@ -83,11 +83,6 @@ public static SerializableType CreateAmqpSerializableType(AmqpSerializer seriali return new AmqpSerializableType(serializer, type); } - public static SerializableType CreateAmqpDescribedType(AmqpSerializer serializer, Type type) - { - return new AmqpDescribedType(serializer, type); - } - public static SerializableType CreateGenericListType( AmqpSerializer serializer, Type type, @@ -195,11 +190,14 @@ public override object ReadObject(ByteBuffer buffer) } } - abstract class AmqpExtendedType : SerializableType + sealed class EnumType : SerializableType { - protected AmqpExtendedType(AmqpSerializer serializer, Type type) - : base(serializer, type) + readonly SerializableType underlyingType; + + public EnumType(Type type, SerializableType underlyingType) + : base(null, type) { + this.underlyingType = underlyingType; } public override void WriteObject(ByteBuffer buffer, object value) @@ -210,106 +208,57 @@ public override void WriteObject(ByteBuffer buffer, object value) } else { - this.EncodeObject(buffer, value); + value = Convert.ChangeType(value, this.underlyingType.type); + this.underlyingType.WriteObject(buffer, value); } } public override object ReadObject(ByteBuffer buffer) { - if (this.TryDecodeNull(buffer)) + object value = this.underlyingType.ReadObject(buffer); + if (value != null) { - return null; + value = Enum.ToObject(this.type, value); } - object container = this.type.CreateInstance(this.hasDefaultCtor); - this.DecodeObject(buffer, container); - return container; + return value; } - - protected bool TryDecodeNull(ByteBuffer buffer) - { - buffer.Validate(false, FixedWidth.FormatCode); - byte formatCode = buffer.Buffer[buffer.Offset]; - if (formatCode == FormatCode.Null) - { - buffer.Complete(FixedWidth.FormatCode); - return true; - } - else - { - return false; - } - } - - protected abstract void EncodeObject(ByteBuffer buffer, object value); - - protected abstract void DecodeObject(ByteBuffer buffer, object value); } - sealed class NullableType : AmqpExtendedType + sealed class AmqpSerializableType : SerializableType { - SerializableType argType; - - public NullableType(Type type, SerializableType argType) - : base(null, type) + public AmqpSerializableType(AmqpSerializer serializer, Type type) + : base(serializer, type) { - this.argType = argType; } - protected override void EncodeObject(ByteBuffer buffer, object value) + public override void WriteObject(ByteBuffer buffer, object value) { - this.argType.WriteObject(buffer, value); + if (value == null) + { + Encoder.WriteObject(buffer, value); + } + else + { + ((IAmqpSerializable)value).Encode(buffer); + } } public override object ReadObject(ByteBuffer buffer) { - if (this.TryDecodeNull(buffer)) + buffer.Validate(false, FixedWidth.FormatCode); + byte formatCode = buffer.Buffer[buffer.Offset]; + if (formatCode == FormatCode.Null) { + buffer.Complete(FixedWidth.FormatCode); return null; } - - return this.argType.ReadObject(buffer); - } - - protected override void DecodeObject(ByteBuffer buffer, object value) - { - throw new NotImplementedException(); - } - } - - sealed class AmqpSerializableType : AmqpExtendedType - { - public AmqpSerializableType(AmqpSerializer serializer, Type type) - : base(serializer, type) - { - } - - protected override void EncodeObject(ByteBuffer buffer, object value) - { - ((IAmqpSerializable)value).Encode(buffer); - } - - protected override void DecodeObject(ByteBuffer buffer, object value) - { - ((IAmqpSerializable)value).Decode(buffer); - } - } - - sealed class AmqpDescribedType : AmqpExtendedType - { - public AmqpDescribedType(AmqpSerializer serializer, Type type) - : base(serializer, type) - { - } - - protected override void EncodeObject(ByteBuffer buffer, object value) - { - ((Described)value).Encode(buffer); - } - - protected override void DecodeObject(ByteBuffer buffer, object value) - { - ((Described)value).Decode(buffer); + else + { + object value = this.type.CreateInstance(this.hasDefaultCtor); + ((IAmqpSerializable)value).Decode(buffer); + return value; + } } } diff --git a/src/Session.cs b/src/Session.cs index 807c7e91..875c4b2c 100644 --- a/src/Session.cs +++ b/src/Session.cs @@ -21,6 +21,13 @@ namespace Amqp using Amqp.Framing; using Amqp.Types; + /// + /// The callback that is invoked when a begin performative is received from peer. + /// + /// The session. + /// The received begin performative. + public delegate void OnBegin(Session session, Begin begin); + /// /// The Session class represents an AMQP session. /// @@ -38,9 +45,9 @@ enum State End } - const int DefaultMaxLinks = 64; - const uint defaultWindowSize = 2048; + internal const uint defaultWindowSize = 2048; readonly Connection connection; + readonly OnBegin onBegin; readonly ushort channel; uint handleMax; Link[] localLinks; @@ -64,15 +71,22 @@ enum State /// /// The connection within which to create the session. public Session(Connection connection) - : this(connection, new Begin() { IncomingWindow = defaultWindowSize, OutgoingWindow = defaultWindowSize, HandleMax = DefaultMaxLinks - 1 }) + : this(connection, Default(connection), null) { } - internal Session(Connection connection, Begin begin) + /// + /// Initializes a session object with a custom Begin performative. + /// + /// The connection in which the session will be created. + /// The Begin performative to be sent to the remote peer. + /// The callback to invoke when a begin is received from peer. + public Session(Connection connection, Begin begin, OnBegin onBegin) { this.connection = connection; + this.onBegin = onBegin; this.handleMax = begin.HandleMax; - this.nextOutgoingId = uint.MaxValue - 2u; + this.nextOutgoingId = begin.NextOutgoingId; this.incomingWindow = defaultWindowSize; this.outgoingWindow = begin.IncomingWindow; this.incomingDeliveryId = uint.MaxValue; @@ -82,8 +96,6 @@ internal Session(Connection connection, Begin begin) this.outgoingList = new LinkedList(); this.channel = connection.AddSession(this); - begin.IncomingWindow = this.incomingWindow; - begin.NextOutgoingId = this.nextOutgoingId; this.state = State.BeginSent; this.SendBegin(begin); } @@ -130,19 +142,32 @@ internal uint AddLink(Link link) this.ThrowIfEnded("AddLink"); lock (this.ThisLock) { + int index = -1; int count = this.localLinks.Length; for (int i = 0; i < count; ++i) { if (this.localLinks[i] == null) { - this.localLinks[i] = link; - return (uint)i; + if (index < 0) + { + index = i; + } + } + else if (string.Compare(this.localLinks[i].Name, link.Name) == 0) + { + throw new AmqpException(ErrorCode.NotAllowed, link.Name + " has been attached."); } } + if (index >= 0) + { + this.localLinks[index] = link; + return (uint)index; + } + if (count - 1 < this.handleMax) { - int size = Math.Min(count * 2, (int)this.handleMax + 1); + int size = (int)Math.Min(count * 2 - 1, this.handleMax) + 1; Link[] expanded = new Link[size]; Array.Copy(this.localLinks, expanded, count); this.localLinks = expanded; @@ -249,6 +274,11 @@ internal void OnBegin(ushort remoteChannel, Begin begin) { this.handleMax = begin.HandleMax; } + + if (this.onBegin != null) + { + this.onBegin(this, begin); + } } internal bool OnEnd(End end) @@ -358,7 +388,7 @@ internal virtual void OnAttach(Attach attach) for (int i = 0; i < this.localLinks.Length; ++i) { Link temp = this.localLinks[i]; - if (temp != null && temp.LinkState == LinkState.AttachSent && string.Compare(temp.Name, attach.LinkName) == 0) + if (temp != null && string.Compare(temp.Name, attach.LinkName) == 0) { link = temp; this.AddRemoteLink(attach.Handle, link); @@ -383,7 +413,7 @@ internal void AddRemoteLink(uint remoteHandle, Link link) int count = this.remoteLinks.Length; if (count - 1 < remoteHandle) { - int size = Math.Min(count * 2, (int)this.handleMax + 1); + int size = (int)Math.Min(count * 2 - 1, this.handleMax) + 1; Link[] expanded = new Link[size]; Array.Copy(this.remoteLinks, expanded, count); this.remoteLinks = expanded; @@ -400,6 +430,17 @@ internal void AddRemoteLink(uint remoteHandle, Link link) } } + static Begin Default(Connection connection) + { + return new Begin() + { + IncomingWindow = defaultWindowSize, + OutgoingWindow = defaultWindowSize, + HandleMax = (uint)(connection.MaxLinksPerSession - 1), + NextOutgoingId = uint.MaxValue - 2u + }; + } + void CancelPendingDeliveries(Error error) { Delivery toRealse; diff --git a/test/Test.Amqp.Net/AmqpSerializerTests.cs b/test/Test.Amqp.Net/AmqpSerializerTests.cs index 243ec4c9..4ae07db8 100644 --- a/test/Test.Amqp.Net/AmqpSerializerTests.cs +++ b/test/Test.Amqp.Net/AmqpSerializerTests.cs @@ -29,6 +29,128 @@ namespace Test.Amqp [TestClass] public class AmqpSerializerTests { + [TestMethod()] + public void AmqpSerializerPrimitiveTypeTest() + { + RunPrimitiveTypeTest(null); + + RunPrimitiveTypeTest(true); + RunPrimitiveTypeTest(false); + + RunPrimitiveTypeTest(byte.MinValue); + RunPrimitiveTypeTest(222); + RunPrimitiveTypeTest(byte.MaxValue); + + RunPrimitiveTypeTest(ushort.MinValue); + RunPrimitiveTypeTest(2222); + RunPrimitiveTypeTest(ushort.MaxValue); + + RunPrimitiveTypeTest(uint.MinValue); + RunPrimitiveTypeTest(22); + RunPrimitiveTypeTest(2222222); + RunPrimitiveTypeTest(uint.MaxValue); + + RunPrimitiveTypeTest(ulong.MinValue); + RunPrimitiveTypeTest(22); + RunPrimitiveTypeTest(2222222222222); + RunPrimitiveTypeTest(ulong.MaxValue); + + RunPrimitiveTypeTest(sbyte.MinValue); + RunPrimitiveTypeTest(-111); + RunPrimitiveTypeTest(0); + RunPrimitiveTypeTest(111); + RunPrimitiveTypeTest(sbyte.MaxValue); + + RunPrimitiveTypeTest(short.MinValue); + RunPrimitiveTypeTest(-11111); + RunPrimitiveTypeTest(0); + RunPrimitiveTypeTest(11111); + RunPrimitiveTypeTest(short.MaxValue); + + RunPrimitiveTypeTest(int.MinValue); + RunPrimitiveTypeTest(-22); + RunPrimitiveTypeTest(0); + RunPrimitiveTypeTest(2222222); + RunPrimitiveTypeTest(int.MaxValue); + + RunPrimitiveTypeTest(long.MinValue); + RunPrimitiveTypeTest(-222222222222); + RunPrimitiveTypeTest(22); + RunPrimitiveTypeTest(2222222222222); + RunPrimitiveTypeTest(long.MaxValue); + + RunPrimitiveTypeTest(float.MinValue); + RunPrimitiveTypeTest(-123.456F); + RunPrimitiveTypeTest(0); + RunPrimitiveTypeTest(123.456F); + RunPrimitiveTypeTest(float.MaxValue); + + RunPrimitiveTypeTest(double.MinValue); + RunPrimitiveTypeTest(-123.456789F); + RunPrimitiveTypeTest(0); + RunPrimitiveTypeTest(123.456789F); + RunPrimitiveTypeTest(double.MaxValue); + + RunPrimitiveTypeTest('A'); + RunPrimitiveTypeTest('δΈ­'); + + RunPrimitiveTypeTest(DateTime.MinValue.ToUniversalTime()); + RunPrimitiveTypeTest(DateTime.UtcNow); + RunPrimitiveTypeTest(DateTime.MaxValue.ToUniversalTime()); + + RunPrimitiveTypeTest(Guid.Empty); + RunPrimitiveTypeTest(Guid.NewGuid()); + + RunPrimitiveTypeTest(new byte[0]); + RunPrimitiveTypeTest(new byte[] { 4, 5, 6, 7, 8 }); + RunPrimitiveTypeTest(System.Text.Encoding.UTF8.GetBytes(new string('D', 888))); + + RunPrimitiveTypeTest(string.Empty); + RunPrimitiveTypeTest("test"); + RunPrimitiveTypeTest(new string('D', 888)); + + RunPrimitiveTypeTest(string.Empty); + RunPrimitiveTypeTest("test"); + RunPrimitiveTypeTest(new string('D', 888)); + + RunPrimitiveTypeTest(Category.Food); + RunPrimitiveTypeTest(Category.Food); + + RunPrimitiveTypeTest(456); + RunPrimitiveTypeTest(-1); + RunPrimitiveTypeTest(1234567890); + RunPrimitiveTypeTest(null); + } + + static void RunPrimitiveTypeTest(T value) + { + ByteBuffer b = new ByteBuffer(512, true); + AmqpSerializer.Serialize(b, value); + T o = AmqpSerializer.Deserialize(b); + + if (typeof(T) == typeof(DateTime)) + { + DateTime now = DateTime.UtcNow; + long x = Convert.ToInt64((now - (DateTime)(object)value).TotalMilliseconds); + long y = Convert.ToInt64((now - (DateTime)(object)o).TotalMilliseconds); + Assert.IsTrue(Math.Abs(x - y) < 2, "timestamp difference should be less than 2"); + } + else if (typeof(T) == typeof(byte[])) + { + byte[] b1 = (byte[])(object)value; + byte[] b2 = (byte[])(object)o; + Assert.AreEqual(b1.Length, b2.Length, "Count is not equal."); + for (int i = 0; i < b1.Length; ++i) + { + Assert.AreEqual(b1[i], b2[i], string.Format("The {0}th byte is not equal ({1} != {2}).", i, b1[i], b2[i])); + } + } + else + { + Assert.AreEqual(value, o, "value not equal after deserialize"); + } + } + [TestMethod()] public void AmqpSerializerListEncodingTest() { @@ -151,7 +273,7 @@ public void AmqpSerializerMapEncodingTest() // serializer test { var specification = new ComputerSpecification() { Cores = 2, RamSize = 4, Description = "netbook" }; - var product = new Product() { Name = "Computer", Price = 499.98, Weight = 30, Specification = specification }; + var product = new Product() { Name = "Computer", Price = 499.98, Weight = 30, Specification = specification, Category = Category.Electronic }; var buffer = new ByteBuffer(1024, true); AmqpSerializer.Serialize(buffer, product); @@ -164,6 +286,7 @@ public void AmqpSerializerMapEncodingTest() Assert.AreEqual(product.Name, product2.Name); Assert.AreEqual(product.Price, product2.Price); Assert.AreEqual(product.Weight, product2.Weight); + Assert.AreEqual(product.Category, product2.Category); var specification2 = product2.Specification as ComputerSpecification; Assert.IsTrue(specification2 != null); @@ -219,7 +342,8 @@ public void AmqpSerializerMapEncodingTest() { new Symbol("Name"), "Car" }, { new Symbol("Price"), 41200.0 }, { new Symbol("Weight"), 5600L }, - { new Symbol("Specification"), specification } + { new Symbol("Specification"), specification }, + { new Symbol("Category"), (sbyte)Category.Automotive } }); var buffer = new ByteBuffer(1024, true); @@ -229,6 +353,7 @@ public void AmqpSerializerMapEncodingTest() Assert.AreEqual("Car", product2.Name); Assert.AreEqual(41200.0, product2.Price); Assert.AreEqual(5600L, product2.Weight); + Assert.AreEqual(Category.Automotive, product2.Category); var specification2 = product2.Specification as CarSpecification; Assert.IsTrue(specification2 != null); @@ -382,6 +507,7 @@ static void MessageBodyTest(T value, Action validator) var inputMessage = new Message(value); var buffer = inputMessage.Encode(); var outputMessage = Message.Decode(buffer); + Assert.IsTrue(outputMessage.Body != null, "Body is not null"); var value2 = outputMessage.GetBody(); validator(value, value2); } diff --git a/test/Test.Amqp.Net/ContainerHostTests.cs b/test/Test.Amqp.Net/ContainerHostTests.cs index d44b02f9..0f6f8f08 100644 --- a/test/Test.Amqp.Net/ContainerHostTests.cs +++ b/test/Test.Amqp.Net/ContainerHostTests.cs @@ -40,8 +40,8 @@ public class ContainerHostTests [ClassInitialize] public static void Initialize(TestContext context) { - //Trace.TraceLevel = TraceLevel.Frame; - //Trace.TraceListener = (f, a) => System.Diagnostics.Trace.WriteLine(DateTime.Now.ToString("[hh:ss.fff]") + " " + string.Format(f, a)); + Trace.TraceLevel = TraceLevel.Frame; + Trace.TraceListener = (f, a) => System.Diagnostics.Trace.WriteLine(DateTime.Now.ToString("[hh:ss.fff]") + " " + string.Format(f, a)); } [TestInitialize] @@ -231,6 +231,67 @@ public void ContainerHostUnknownProcessorTest() connection.Close(); } + [TestMethod] + public void ContainerHostDefaultValueTest() + { + string name = MethodInfo.GetCurrentMethod().Name; + this.host.RegisterMessageProcessor(name, new TestMessageProcessor()); + + Open remoteOpen = null; + Begin remoteBegin = null; + Attach remoteAttach = null; + + var connection = new Connection(Address, null, new Open() { ContainerId = "c" }, (c, o) => remoteOpen = o); + var session = new Session(connection, new Begin() { NextOutgoingId = 3 }, (s, b) => remoteBegin = b); + var sender1 = new SenderLink(session, "send-link1", new Attach() { Role = false, Target = new Target() { Address = name }, Source = new Source() }, (l, a) => remoteAttach = a); + var sender2 = new SenderLink(session, "send-link2", new Attach() { Role = false, Target = new Target() { Address = name }, Source = new Source() }, (l, a) => remoteAttach = a); + + sender1.Send(new Message("m1")); + sender2.Send(new Message("m2")); + + session.Close(); + connection.Close(); + + Assert.IsTrue(remoteOpen != null, "remote open not received"); + Assert.IsTrue(remoteOpen.MaxFrameSize < uint.MaxValue, "max frame size not set"); + Assert.IsTrue(remoteOpen.ChannelMax < ushort.MaxValue, "channel max not set"); + + Assert.IsTrue(remoteBegin != null, "remote begin not received"); + Assert.IsTrue(remoteBegin.IncomingWindow < uint.MaxValue, "incoming window not set"); + Assert.IsTrue(remoteBegin.HandleMax < uint.MaxValue, "handle max not set"); + + Assert.IsTrue(remoteAttach != null, "remote attach not received"); + } + + [TestMethod] + public void ContainerHostMultiplexingTest() + { + string name = MethodInfo.GetCurrentMethod().Name; + this.host.RegisterMessageProcessor(name, new TestMessageProcessor()); + + int completed = 0; + ManualResetEvent doneEvent = new ManualResetEvent(false); + var connection = new Connection(Address, null, new Open() { ContainerId = name }, null); + for (int i = 0; i < 10; i++) + { + var session = new Session(connection, new Begin() { NextOutgoingId = (uint)i }, null); + for (int j = 0; j < 20; j++) + { + var link = new SenderLink(session, string.Join("-", name, i, j), name); + for (int k = 0; k < 30; k++) + { + link.Send( + new Message() { Properties = new Properties() { MessageId = string.Join("-", "msg", i, j, k) } }, + (m, o, s) => { if (Interlocked.Increment(ref completed) >= 10 * 20 * 30) doneEvent.Set(); }, + null); + } + } + } + + Assert.IsTrue(doneEvent.WaitOne(10000), "send not completed in time"); + connection.Close(); + } + [TestMethod] public void ContainerHostCloseTest() { @@ -314,18 +375,16 @@ public void DuplicateLinkNameSameSessionTest() var connection = new Connection(Address); var session = new Session(connection); var sender1 = new SenderLink(session, linkName, name); - var sender2 = new SenderLink(session, linkName, name); - sender1.Send(new Message("msg1"), SendTimeout); try { - sender2.Send(new Message("msg1"), SendTimeout); + var sender2 = new SenderLink(session, linkName, name); Assert.IsTrue(false, "Excpected exception not thrown"); } catch(AmqpException ae) { - Assert.AreEqual((Symbol)ErrorCode.Stolen, ae.Error.Condition); + Assert.AreEqual((Symbol)ErrorCode.NotAllowed, ae.Error.Condition); } sender1.Close(); @@ -406,6 +465,39 @@ public void DuplicateLinkNameDifferentRoleTest() connection.Close(); } + + [TestMethod] + public void InvalidAddresses() + { + var connection = new Connection(Address); + var session = new Session(connection); + + try + { + var invalidAddresses = new List() { null, "", " " }; + invalidAddresses.ForEach(addr => + { + var threw = false; + try + { + var sender = new SenderLink(session, "link with bad address", addr); + sender.Send(new Message("1")); + } + catch (AmqpException e) + { + Assert.AreEqual(ErrorCode.InvalidField, e.Error.Condition.ToString(), string.Format("Address '{0}' did not cause an amqp exception with the expected error condition", addr ?? "null")); + threw = true; + } + + Assert.IsTrue(threw, string.Format("Address '{0}' did not throw an amqp exception", addr ?? "null")); + }); + } + finally + { + session.Close(); + connection.Close(); + } + } } class TestMessageProcessor : IMessageProcessor diff --git a/test/Test.Amqp.Net/Types/Product.cs b/test/Test.Amqp.Net/Types/Product.cs index f1375fa4..95a13b4a 100644 --- a/test/Test.Amqp.Net/Types/Product.cs +++ b/test/Test.Amqp.Net/Types/Product.cs @@ -20,6 +20,17 @@ namespace Test.Amqp using System.Collections.Generic; using global::Amqp.Serialization; + enum Category : sbyte + { + Unspecified, + Electronic, + Housewares, + Sports, + Food, + Personal, + Automotive + } + [AmqpContract(Name = "test.amqp:product", Encoding = EncodingType.Map)] class Product { @@ -35,6 +46,9 @@ class Product [AmqpMember] public Specification Specification { get; set; } + [AmqpMember] + public Category Category { get; set; } + public Dictionary Properties; [System.Runtime.Serialization.OnSerializing] diff --git a/test/Test.Amqp.NetMF/LinkTests.cs b/test/Test.Amqp.NetMF/LinkTests.cs index 73164a37..a45d4879 100644 --- a/test/Test.Amqp.NetMF/LinkTests.cs +++ b/test/Test.Amqp.NetMF/LinkTests.cs @@ -737,5 +737,43 @@ public void TestMethod_SendEmptyMessage() Assert.IsTrue(threwArgEx, "Should throw an argument exception when sending an empty message."); } + +#if NETFX || NETFX_CORE + [TestMethod] +#endif + public void TestMethod_ConnectionCreateClose() + { + Connection connection = new Connection(address); + connection.Close(); + Assert.IsTrue(connection.Error == null, "connection has error!"); + } + +#if NETFX || NETFX_CORE + [TestMethod] +#endif + public void TestMethod_SessionCreateClose() + { + Connection connection = new Connection(address); + Session session = new Session(connection); + session.Close(0); + connection.Close(); + Assert.IsTrue(connection.Error == null, "connection has error!"); + } + +#if NETFX || NETFX_CORE + [TestMethod] +#endif + public void TestMethod_LinkCreateClose() + { + Connection connection = new Connection(address); + Session session = new Session(connection); + SenderLink sender = new SenderLink(session, "sender", "q1"); + ReceiverLink receiver = new ReceiverLink(session, "receiver", "q1"); + sender.Close(0); + receiver.Close(0); + session.Close(0); + connection.Close(); + Assert.IsTrue(connection.Error == null, "connection has error!"); + } } }