Skip to content

Commit

Permalink
Abstracted the Handshake process
Browse files Browse the repository at this point in the history
  • Loading branch information
paulpdaniels committed Aug 12, 2015
1 parent 03918b0 commit d718c71
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 45 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ build/
[Bb]in/
[Oo]bj/

NuGet/*.nupkg

# Enable "build/" folder in the NuGet Packages folder since NuGet packages use it for MSBuild targets
!packages/*/build/

Expand Down
2 changes: 2 additions & 0 deletions MiniWamp/MiniWamp.Net45.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@
<Compile Include="MessageType.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Serialization\JsonSerializer.cs" />
<Compile Include="Session\IHandshake.cs" />
<Compile Include="Session\IWampSession.cs" />
<Compile Include="Session\WampConnection.cs" />
<Compile Include="Transport\DefaultTransportFactory.Net45.cs" />
<Compile Include="Transport\ITransportFactory.cs" />
<Compile Include="Transport\WebSocketTransport.cs" />
Expand Down
2 changes: 2 additions & 0 deletions MiniWamp/MiniWamp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
</PropertyGroup>
<ItemGroup>
<Compile Include="IWampSubscription.cs" />
<Compile Include="Session\WampConnection.cs" />
<Compile Include="Session\IHandshake.cs" />
<Compile Include="Session\IWampSession.cs" />
<Compile Include="Session\WampSession.cs" />
<Compile Include="Util\CompositeDisposable.cs" />
Expand Down
29 changes: 29 additions & 0 deletions MiniWamp/Session/IHandshake.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DapperWare.Session
{
public interface IHandshake
{
/// <summary>
/// Gets the transport that this handshake will operate over
/// </summary>
IWampTransport Transport { get; }

/// <summary>
/// Readies a transport session for use by a WampSession
/// </summary>
/// <returns>A task with the session id after negotiation</returns>
Task<string> Open();

/// <summary>
/// Terminates the connection gracefully
/// </summary>
/// <returns></returns>
Task Close();

}
}
2 changes: 1 addition & 1 deletion MiniWamp/Session/IWampSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface IWampSession
System.Threading.Tasks.Task<T> Call<T>(string method, params object[] content);

/// <summary>
/// Shuts down this session, unsubscribes all topics and closes the underlying transport
/// Shuts down this session, unsubscribes all topics
/// </summary>
void Close();

Expand Down
106 changes: 106 additions & 0 deletions MiniWamp/Session/WampConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DapperWare.Session
{
public class WampConnection
{
//Only one session can be active at any one time
private IWampSession _session;

/// <summary>
/// Gets the Url endpoint for this connection
/// </summary>
public string Endpoint { get; private set; }

/// <summary>
/// The underlying transport for this API
/// </summary>
public IWampTransport Transport { get; private set; }

/// <summary>
/// The protocol for establishing a session over this transport
/// </summary>
public IHandshake Handshake { get; private set; }

/// <summary>
/// Builds a new WampConnection
/// </summary>
/// <param name="url"></param>
public WampConnection(string url, IWampTransport transport)
{
this.Endpoint = url;
this.Transport = transport;
this.Handshake = new SimpleHandshake(this.Transport);
}

public async Task<IWampSession> Open()
{
//TODO This is not thread safe
if (_session == null)
{
//Handle the client<->server negotiation
var handshake = Handshake.Open();

await Transport.ConnectAsync(this.Endpoint);

var sessionid = await handshake;

_session = new WampSession(sessionid, Transport);
}

return _session;
}


private class SimpleHandshake : IHandshake
{
private TaskCompletionSource<string> _tcs;

public SimpleHandshake(IWampTransport transport)
{
this.Transport = transport;
}

public IWampTransport Transport
{
get;
private set;

}

public Task<string> Open()
{
_tcs = new TaskCompletionSource<string>();
this.Transport.Message += Transport_Message;
return _tcs.Task;
}

private void Transport_Message(object sender, WampMessageEventArgs e)
{
var type = (MessageType)e.Message[0].Value<int>();

if (type == MessageType.WELCOME)
{
_tcs.TrySetResult(e.Message[1].Value<string>());
}
else
{
_tcs.TrySetException(new Exception("Handshake Failed!"));
}

//We are done don't need this anymore
this.Transport.Message -= Transport_Message;
}

public Task Close()
{
throw new NotImplementedException();
}
}
}
}
48 changes: 14 additions & 34 deletions MiniWamp/Session/WampSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public class WampSession : IWampSession
private Dictionary<string, Action<Exception, JToken>> _pendingCalls;
private Dictionary<MessageType, Action<JArray>> _messageHandlers;
private IWampTransport _transport;
private TaskCompletionSource<bool> _welcomed;
private PrefixDictionary _prefixes;

#endregion
Expand Down Expand Up @@ -66,13 +65,13 @@ public Map<string, string> Prefixes
#endregion

#region Constructor
public WampSession(IWampTransport transport)
public WampSession(string sessionid, IWampTransport transport)
{
this.SessionId = sessionid;
this._pendingCalls = new Dictionary<string, Action<Exception, JToken>>();
this._messageHandlers = new Dictionary<MessageType, Action<JArray>>();
this._topics = new Dictionary<string, IWampSubscription>();

this._messageHandlers[MessageType.WELCOME] = OnWelcome;
this._messageHandlers[MessageType.CALLRESULT] = OnCallResult;
this._messageHandlers[MessageType.CALLERROR] = OnCallError;
this._messageHandlers[MessageType.EVENT] = OnEvent;
Expand Down Expand Up @@ -155,7 +154,7 @@ public IWampSubject<T> Subscribe<T>(string topic)
}
else
{
subscription = (WampSubscription<T>)found;
subscription = (IWampSubscription<T>)found;
}

return subscription.CreateSubject();
Expand All @@ -179,7 +178,9 @@ public void Publish<T>(string topic, T ev, bool excludeMe = false)

public void Publish<T>(string topic, T ev, IEnumerable<string> exclude, IEnumerable<string> eligible)
{
List<object> payload = new List<object> { MessageType.SUBSCRIBE,
List<object> payload = new List<object>
{
MessageType.SUBSCRIBE,
this._prefixes.Shrink(topic),
ev,
exclude.ToList(),
Expand Down Expand Up @@ -221,23 +222,16 @@ public void Unsubscribe()

#endregion

/// <summary>
/// Connects this session and waits for a welcome message from the server
/// </summary>
/// <param name="url"></param>
/// <returns></returns>
internal async Task ConnectAsync(string url)
{
this._welcomed = new TaskCompletionSource<bool>();
await this._transport.ConnectAsync(url);
await this._welcomed.Task;
}

public void Close()
{
this._transport.Close();
this._welcomed.TrySetCanceled();

//TODO Clear up all pending calls and close any existing subscriptions
this.Transport.Message -= transport_Message;

foreach (var item in this.Subscriptions)
{
item.Dispose();
}
}

/// <summary>
Expand All @@ -258,28 +252,14 @@ private void transport_Message(object sender, WampMessageEventArgs e)
else
{
//TODO Report errors
throw new WampException(String.Format("Message type {0} is either unknown or not supported at this time!", type));
}


}

#region Message Handlers

/// <summary>
/// Raised when the on welcome message has been received
/// </summary>
/// <param name="obj"></param>
private void OnWelcome(JArray obj)
{
if (this._welcomed != null)
{
this.SessionId = obj[1].Value<string>();
this._welcomed.SetResult(true);
}
}



/// <summary>
/// Raised when a new event has been received
/// </summary>
Expand Down
21 changes: 12 additions & 9 deletions MiniWamp/WampClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using DapperWare.Transport;
using DapperWare.Session;
using DapperWare.Transport;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -14,7 +15,7 @@ public class WampClient
/// </summary>
/// <param name="url"></param>
/// <returns></returns>
public static Task<WampSession> ConnectAsync(string url)
public static Task<IWampSession> ConnectAsync(string url)
{
return ConnectAsync(url, DefaultTransportFactory.Default);
}
Expand All @@ -31,7 +32,7 @@ public static Task<WampSession> ConnectAsync(string url)
/// <param name="url"></param>
/// <param name="factory"></param>
/// <returns></returns>
public static Task<WampSession> ConnectAsync(string url, ITransportFactory factory)
public static Task<IWampSession> ConnectAsync(string url, ITransportFactory factory)
{
return ConnectAsync(url, () => factory.Create());
}
Expand All @@ -42,13 +43,15 @@ public static Task<WampSession> ConnectAsync(string url, ITransportFactory facto
/// <param name="url"></param>
/// <param name="factoryFn"></param>
/// <returns></returns>
public static async Task<WampSession> ConnectAsync(string url, Func<IWampTransport> factoryFn)
public static async Task<IWampSession> ConnectAsync(string url, Func<IWampTransport> factoryFn)
{
var session = new WampSession(factoryFn());
var connection = new WampConnection(url, factoryFn());

await session.ConnectAsync(url);
//var session = new WampSession(factoryFn());

return session;
//await session.ConnectAsync(url);

return await connection.Open();
}

}
Expand All @@ -70,7 +73,7 @@ public class TestWampClient {

private Mock<IWampTransport> mockTransport;

private WampSession connection;
private IWampSession connection;

[SetUp]
public async void SetUp()
Expand All @@ -91,7 +94,7 @@ public async void SetUp()
[Test]
public void TestInitializedSession()
{
Assert.AreEqual(connection.SessionId, "mysessionid");
Assert.AreEqual("mysessionid", connection.SessionId);
}

[Test]
Expand Down
2 changes: 1 addition & 1 deletion MiniWampTests/WampSessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void Send(IEnumerable<object> message)

private ITransportFactory mockTransportFactory;
private MockWampTransport mockTransport;
private WampSession connection;
private IWampSession connection;


[TestInitialize]
Expand Down

0 comments on commit d718c71

Please sign in to comment.