Skip to content
This repository has been archived by the owner on Aug 15, 2022. It is now read-only.

Implement TAP-based request-response flow on top of RPCs #399

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public virtual object Map(Type type, BMSByte stream)
obj = MapBMSByte(stream);
else if (type.IsEnum)
obj = MapBasicType(Enum.GetUnderlyingType(type), stream);
else if (typeof(ISerializable).IsAssignableFrom(type))
obj = MapSerializableType(type, stream);
else
obj = MapBasicType(type, stream);

Expand Down Expand Up @@ -76,6 +78,8 @@ public virtual T Map<T>(BMSByte stream)
obj = MapBMSByte(stream);
else if (genericType.IsEnum)
obj = MapBasicType(Enum.GetUnderlyingType(genericType), stream);
else if (typeof(ISerializable).IsAssignableFrom(typeof(T)))
obj = MapSerializableType(genericType, stream);
else
obj = MapBasicType(genericType, stream);

Expand Down Expand Up @@ -120,7 +124,10 @@ public object MapBasicType(Type type, BMSByte stream)
return stream.GetBasicType<Vector>();
else
// TODO: Make this an object mapper exception
throw new BaseNetworkException("The type " + type.ToString() + " is not allowed to be sent over the Network (yet)");
throw new BaseNetworkException(
"The type " + type.ToString() + " is not allowed to be sent over the Network (yet). For custom " +
"types, you can implement ISerializable."
);
}

/// <summary>
Expand Down Expand Up @@ -244,6 +251,8 @@ protected virtual void GetBytes(object o, Type type, ref BMSByte bytes)
bytes.Append(BitConverter.GetBytes(vec.y));
bytes.Append(BitConverter.GetBytes(vec.z));
}
else if (typeof(ISerializable).IsAssignableFrom(type))
((ISerializable)o).Serialize(bytes);
else if (type == null) //TODO: Check if this causes other issues
bytes.Append(new byte[1] { 0 });
else if (type == typeof(sbyte))
Expand Down Expand Up @@ -327,7 +336,10 @@ protected virtual void GetBytes(object o, Type type, ref BMSByte bytes)
else
{
// TODO: Make this a more appropriate exception
throw new BaseNetworkException("The type " + type.ToString() + " is not allowed to be sent over the Network (yet)");
throw new BaseNetworkException(
"The type " + type.ToString() + " is not allowed to be sent over the Network (yet). For custom " +
"types, you can implement ISerializable."
);
}
}

Expand Down Expand Up @@ -357,6 +369,12 @@ protected virtual byte[] GetBytesArray(object o, Type type)
Buffer.BlockCopy(BitConverter.GetBytes(vec.z), 0, bytes, sizeof(float) * 2, sizeof(float));
return bytes;
}
else if (typeof(ISerializable).IsAssignableFrom(type))
{
BMSByte buffer = new BMSByte();
((ISerializable)o).Serialize(buffer);
return buffer.byteArr;
}
else if (type == null) //TODO: Check if this causes other issues
return new byte[1] { 0 };
else if (type == typeof(sbyte))
Expand Down Expand Up @@ -452,10 +470,26 @@ protected virtual byte[] GetBytesArray(object o, Type type)
else
{
// TODO: Make this a more appropriate exception
throw new BaseNetworkException("The type " + type.ToString() + " is not allowed to be sent over the Network (yet)");
throw new BaseNetworkException(
"The type " + type.ToString() + " is not allowed to be sent over the Network (yet). For custom " +
"types, you can implement ISerializable."
);
}
}

/// <summary>
/// Get a .NET Quaternion out of a FrameStream
/// </summary>
/// <param name="type">Type of object to be mapped</param>
/// <param name="stream">FrameStream to be used</param>
/// <returns>A type of .NET Quaternion out of the FrameStream</returns>
public object MapSerializableType(Type type, BMSByte stream)
{
var obj = Activator.CreateInstance(type);
((ISerializable)obj).Unserialize(stream);
return obj;
}

/// <summary>
/// Creates a BMSByte using ObjectMapper
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace BeardedManStudios.Forge.Networking
{
/// <summary>
/// Represents the context of a request.
/// </summary>
public interface ISerializable
{
void Serialize(BMSByte buffer);
void Unserialize(BMSByte buffer);
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BeardedManStudios.Forge.Logging;
using BeardedManStudios.Forge.Networking.Frame;
using BeardedManStudios.Source.Forge.Networking;
using BeardedManStudios.Threading;

namespace BeardedManStudios.Forge.Networking
{
Expand Down Expand Up @@ -217,6 +220,11 @@ public abstract class NetworkObject
private static List<NetworkObject> pendingCreates = new List<NetworkObject>();
public static readonly object PendingCreatesLock = new object();

private long nextRequestId = 1;
private ConcurrentDictionary<long,TaskCompletionSource<object>> pendingRequests = new ConcurrentDictionary<long,TaskCompletionSource<object>>();
private Dictionary<string,(Action<RpcArgs> requestHandler,Action<RpcArgs> responseHandler)> requestHandlerMap =
new Dictionary<string,(Action<RpcArgs> requestHandler,Action<RpcArgs> responseHandler)>();

public byte[] Metadata { get; private set; }

/// <summary>
Expand Down Expand Up @@ -477,6 +485,8 @@ private void CreateNativeRpcs()
RegisterRpc("RemoveRpcFromBuffer", RemoveRpcFromBuffer);
RegisterRpc("TakeOwnership", TakeOwnership);
RegisterRpc("AssignOwnership", AssignOwnership, typeof(bool));
RegisterRpc("HandleRequest", HandleRequest, typeof(string), typeof(long), typeof(byte[]));
RegisterRpc("HandleResponse", HandleResponse, typeof(string), typeof(long), typeof(byte[]));
}

/// <summary>
Expand Down Expand Up @@ -651,7 +661,7 @@ public static void PlayerAccepted(NetworkingPlayer player, NetworkObject[] netwo
foreach (NetworkObject obj in networkObjects)
obj.currentRpcBufferCounts.Add(player, obj.rpcBuffer.Count);

Task.Queue(() =>
BeardedManStudios.Threading.Task.Queue(() =>
{
lock (player)
{
Expand Down Expand Up @@ -832,6 +842,199 @@ public bool RegisterOnce(uint id)
return true;
}

/// <summary>
/// Send a request to the specified receivers. A task is returned that will resolve with the response from the
/// other party.
/// </summary>
/// <param name="name">The name of the request to be sent</param>
/// <param name="receivers">The clients / server to receive the message</param>
/// <param name="data">Data to pass along with the request, this can be any serializable object</param>
public Task<T> SendRequest<T>(string name, Receivers receivers, object data = null)
{
return SendRequest<T>(null, name, receivers, data);
}

/// <summary>
/// Send a request to the specified receivers. A task is returned that will resolve with the response from the
/// other party.
/// </summary>
/// <param name="targetPlayer">The player that is being sent this RPC from the server</param>
/// <param name="name">The name of the request to be sent</param>
/// <param name="data">Data to pass along with the request, this can be any serializable object</param>
public Task<T> SendRequest<T>(NetworkingPlayer targetPlayer, string name, object data = null)
{
return SendRequest<T>(targetPlayer, name, Receivers.Target, data);
}

/// <summary>
/// Send a request to the specified receivers. A task is returned that will resolve with the response from all
/// other parties.
///
/// This method will not return until all parties have responded to the request.
/// </summary>
/// <param name="targetPlayers">An array of <see cref="NetworkingPlayer"/>s to send this RPC from the server</param>
/// <param name="name">The name of the request to send</param>
/// <param name="data">Data to pass along with the request, this can be any serializable object</param>
public Task<T[]> SendRequest<T>(NetworkingPlayer[] targetPlayers, string name, object data = null)
{
var tasks = new List<Task<T>>();
for (int i = 0; i < targetPlayers.Length; i++)
{
tasks.Add(SendRequest<T>(targetPlayers[i], name, Receivers.Target, data));
}
return Task.WhenAll(tasks.ToArray());
}

/// <summary>
/// Send a request to the specified receivers. A task is returned that will resolve with the response from the
/// other party.
/// </summary>
/// <param name="targetPlayer">The target player that should receive the request</param>
/// <param name="name">The name of the request to send</param>
/// <param name="receivers">The clients / server to receive the message</param>
/// <param name="data">Data to pass along with the request, this can be any serializable object</param>
/// <returns></returns>
public Task<T> SendRequest<T>(NetworkingPlayer targetPlayer, string name, Receivers receivers, object data = null)
{
if (!requestHandlerMap.ContainsKey(name))
throw new BaseNetworkException($"Sending request failed: request name '{name}' was not registered");

if (receivers != Receivers.Target && receivers != Receivers.Server && receivers != Receivers.Owner)
{
throw new BaseNetworkException(
$"Requests can only be sent to one receiver currently. If you want to send to multiple " +
"receivers, either send multiple requests or use the SendRequest overload using an array of " +
"networking players, which handles this for you."
);
}

var requestId = Interlocked.Increment(ref nextRequestId);
object[] args = new object[] { name, requestId, ObjectMapper.BMSByte(data).byteArr };
TaskCompletionSource<object> promise = new TaskCompletionSource<object>();

pendingRequests.TryAdd(requestId, promise);

BMSLog.Log($"Sending request '{name}' with ID {requestId} and data '{data}' to receivers '{receivers}'");

SendRpc(targetPlayer, "HandleRequest", false, receivers, args);

return promise.Task.ContinueWith((state) =>
{
try
{
return (T)state.Result;
}
catch (InvalidCastException e)
{
throw new BaseNetworkException(
$"You indicated when sending request '{name}' that the response would be of type " +
$"'{typeof(T)}', but the return type was set to a different type when registering the request",
e
);
}
});
}

/// <summary>
/// Registers a new request that can be sent to this object.
///
/// Requests are similar to RPCs. They only differ in that they can send back an arbitrary response and use a
/// TAP-based (Task-based Asynchronous Programming) async flow and are thus more convenient to use in scenarios
/// where you want to follow a HTTP-like or request-like flow. However, they tend to be slower because they
/// format incoming and outgoing data through the .NET binary formatter.
///
/// A generic object comes in. Cast it to the appropriate type you expect.
/// </summary>
/// <param name="name">The name of the request</param>
/// <param name="handler">The handler to execute when this request is received</param>
public void RegisterRequest<T1,T2>(string name, Func<RequestContext<T1>,Task<T2>> handler)
{
if (requestHandlerMap.ContainsKey(name))
throw new BaseNetworkException($"The request '{name}' has already been registered");

async void requestHandler(RpcArgs rpcArgs)
{
BMSLog.Log($"Received request '{name}' from {rpcArgs.Info.SendingPlayer.Ip}");

var requestId = rpcArgs.GetNext<long>();
var requestData = rpcArgs.GetNext<byte[]>();

BMSByte buffer = new BMSByte();
buffer.Clone(requestData);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be optimal as it copies data unnecessarily: I haven't checked if there is a way to create a BMSByte from an existing byte[] without copying.


T1 requestDataObject = ObjectMapper.Instance.Map<T1>(buffer);

BMSLog.Log($"Request ID is {requestId} and data is '{requestDataObject}'");

Task<T2> responsePromise = handler(new RequestContext<T1>(rpcArgs.Info.SendingPlayer, requestDataObject));

void sendResponseToSender(object newResponseObject)
{
byte[] responseData = ObjectMapper.BMSByte(newResponseObject).byteArr;

BMSLog.Log(
$"Sending back response '{newResponseObject}' to request '{name}' with ID {requestId}"
);

SendRpc(rpcArgs.Info.SendingPlayer, "HandleResponse", name, requestId, responseData);
};

sendResponseToSender(await responsePromise);
}

void responseHandler(RpcArgs rpcArgs)
{
BMSLog.Log($"Received response for request '{name}' from {rpcArgs.Info.SendingPlayer.Ip}");

var requestId = rpcArgs.GetNext<long>();
var responseData = rpcArgs.GetNext<byte[]>();

BMSByte buffer = new BMSByte();
buffer.Clone(responseData);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.


T2 responseObject = ObjectMapper.Instance.Map<T2>(buffer);

BMSLog.Log($"Response is for request ID {requestId} and data is '{responseObject}'");

if (!pendingRequests.TryRemove(requestId, out TaskCompletionSource<object> promise))
{
throw new BaseNetworkException(
$"Received response for request {requestId} ('{name}'), but this request was either " +
"not sent or a response was already received"
);
}

promise.TrySetResult(responseObject);
}

requestHandlerMap.Add(name, (requestHandler, responseHandler));
}

private void HandleRequest(RpcArgs args)
{
var name = args.GetNext<string>();

if (!requestHandlerMap.ContainsKey(name))
throw new BaseNetworkException($"Received request '{name}', but it was never registered and is not known");

Threading.Task.Queue(() => requestHandlerMap[name].requestHandler(args));
}

private void HandleResponse(RpcArgs args)
{
var name = args.GetNext<string>();

if (!requestHandlerMap.ContainsKey(name))
{
throw new BaseNetworkException(
$"Received response for a request '{name}', but it was never registered and could thus never " +
"have been sent"
);
}

Threading.Task.Queue(() => requestHandlerMap[name].responseHandler(args));
}

/// <summary>
/// This will register a method to this network object as an Rpc
/// </summary>
Expand Down Expand Up @@ -1591,7 +1794,7 @@ public void Destroy(int timeInMilliseconds = 0)
{
if (timeInMilliseconds > 0)
{
Task.Queue(() =>
BeardedManStudios.Threading.Task.Queue(() =>
{
Destroy(false);
}, timeInMilliseconds);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace BeardedManStudios.Forge.Networking
{
/// <summary>
/// Represents the context of a request.
/// </summary>
public class RequestContext<T>
{
private readonly NetworkingPlayer sender;
public NetworkingPlayer Sender { get => sender; }

private readonly T data;
public T Data { get => data; }

public RequestContext(NetworkingPlayer sender, T data)
{
this.sender = sender;
this.data = data;
}
}
}
Loading