From 0bdd1272a707d9d76d710ae581fc9e39927b7b44 Mon Sep 17 00:00:00 2001 From: Tom Gerrits Date: Fri, 15 Jan 2021 16:26:20 +0100 Subject: [PATCH] Implement TAP-based request-response flow on top of RPCs This introduces requests with responses, which are just RPCs that can send back a value. Requests are also just layered on top of RPCs and don't introduce any new special treatment, they are just some convenience sugar on top of RPCs where you want to send back a response. Sending a request will return a Task and follow TAP [1], so you can easily await the response to your request from the receiver. [1] https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap --- .../Forge/Networking/ObjectMapper.cs | 40 +++- .../Forge/Networking/Objects/ISerializable.cs | 11 + .../Networking/Objects/ISerializable.cs.meta | 11 + .../Forge/Networking/Objects/NetworkObject.cs | 209 +++++++++++++++++- .../Networking/Objects/RequestContext.cs | 20 ++ .../Networking/Objects/RequestContext.cs.meta | 11 + 6 files changed, 296 insertions(+), 6 deletions(-) create mode 100644 ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs create mode 100644 ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs.meta create mode 100644 ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs create mode 100644 ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs.meta diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/ObjectMapper.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/ObjectMapper.cs index 6cbdee25..098c683a 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/ObjectMapper.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/ObjectMapper.cs @@ -49,6 +49,8 @@ public virtual object Map(Type type, BMSByte stream) obj = MapBMSByte(stream); else if (type.IsEnum) obj = MapBasicType(Enum.GetUnderlyingType(type), stream); + else if (typeof(ISerializable).IsAssignableFrom(type)) + obj = MapSerializableType(type, stream); else obj = MapBasicType(type, stream); @@ -76,6 +78,8 @@ public virtual T Map(BMSByte stream) obj = MapBMSByte(stream); else if (genericType.IsEnum) obj = MapBasicType(Enum.GetUnderlyingType(genericType), stream); + else if (typeof(ISerializable).IsAssignableFrom(typeof(T))) + obj = MapSerializableType(genericType, stream); else obj = MapBasicType(genericType, stream); @@ -120,7 +124,10 @@ public object MapBasicType(Type type, BMSByte stream) return stream.GetBasicType(); else // TODO: Make this an object mapper exception - throw new BaseNetworkException("The type " + type.ToString() + " is not allowed to be sent over the Network (yet)"); + throw new BaseNetworkException( + "The type " + type.ToString() + " is not allowed to be sent over the Network (yet). For custom " + + "types, you can implement ISerializable." + ); } /// @@ -244,6 +251,8 @@ protected virtual void GetBytes(object o, Type type, ref BMSByte bytes) bytes.Append(BitConverter.GetBytes(vec.y)); bytes.Append(BitConverter.GetBytes(vec.z)); } + else if (typeof(ISerializable).IsAssignableFrom(type)) + ((ISerializable)o).Serialize(bytes); else if (type == null) //TODO: Check if this causes other issues bytes.Append(new byte[1] { 0 }); else if (type == typeof(sbyte)) @@ -327,7 +336,10 @@ protected virtual void GetBytes(object o, Type type, ref BMSByte bytes) else { // TODO: Make this a more appropriate exception - throw new BaseNetworkException("The type " + type.ToString() + " is not allowed to be sent over the Network (yet)"); + throw new BaseNetworkException( + "The type " + type.ToString() + " is not allowed to be sent over the Network (yet). For custom " + + "types, you can implement ISerializable." + ); } } @@ -357,6 +369,12 @@ protected virtual byte[] GetBytesArray(object o, Type type) Buffer.BlockCopy(BitConverter.GetBytes(vec.z), 0, bytes, sizeof(float) * 2, sizeof(float)); return bytes; } + else if (typeof(ISerializable).IsAssignableFrom(type)) + { + BMSByte buffer = new BMSByte(); + ((ISerializable)o).Serialize(buffer); + return buffer.byteArr; + } else if (type == null) //TODO: Check if this causes other issues return new byte[1] { 0 }; else if (type == typeof(sbyte)) @@ -452,10 +470,26 @@ protected virtual byte[] GetBytesArray(object o, Type type) else { // TODO: Make this a more appropriate exception - throw new BaseNetworkException("The type " + type.ToString() + " is not allowed to be sent over the Network (yet)"); + throw new BaseNetworkException( + "The type " + type.ToString() + " is not allowed to be sent over the Network (yet). For custom " + + "types, you can implement ISerializable." + ); } } + /// + /// Get a .NET Quaternion out of a FrameStream + /// + /// Type of object to be mapped + /// FrameStream to be used + /// A type of .NET Quaternion out of the FrameStream + public object MapSerializableType(Type type, BMSByte stream) + { + var obj = Activator.CreateInstance(type); + ((ISerializable)obj).Unserialize(stream); + return obj; + } + /// /// Creates a BMSByte using ObjectMapper /// diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs new file mode 100644 index 00000000..31308f75 --- /dev/null +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs @@ -0,0 +1,11 @@ +namespace BeardedManStudios.Forge.Networking +{ + /// + /// Represents the context of a request. + /// + public interface ISerializable + { + void Serialize(BMSByte buffer); + void Unserialize(BMSByte buffer); + } +} diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs.meta b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs.meta new file mode 100644 index 00000000..7494be1c --- /dev/null +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 2eb0b0861e189144ba40072cf9fdbfcc +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/NetworkObject.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/NetworkObject.cs index 8c038419..05c4cb6f 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/NetworkObject.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/NetworkObject.cs @@ -1,9 +1,12 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using BeardedManStudios.Forge.Logging; using BeardedManStudios.Forge.Networking.Frame; using BeardedManStudios.Source.Forge.Networking; -using BeardedManStudios.Threading; namespace BeardedManStudios.Forge.Networking { @@ -217,6 +220,11 @@ public abstract class NetworkObject private static List pendingCreates = new List(); public static readonly object PendingCreatesLock = new object(); + private long nextRequestId = 1; + private ConcurrentDictionary> pendingRequests = new ConcurrentDictionary>(); + private Dictionary requestHandler,Action responseHandler)> requestHandlerMap = + new Dictionary requestHandler,Action responseHandler)>(); + public byte[] Metadata { get; private set; } /// @@ -477,6 +485,8 @@ private void CreateNativeRpcs() RegisterRpc("RemoveRpcFromBuffer", RemoveRpcFromBuffer); RegisterRpc("TakeOwnership", TakeOwnership); RegisterRpc("AssignOwnership", AssignOwnership, typeof(bool)); + RegisterRpc("HandleRequest", HandleRequest, typeof(string), typeof(long), typeof(byte[])); + RegisterRpc("HandleResponse", HandleResponse, typeof(string), typeof(long), typeof(byte[])); } /// @@ -651,7 +661,7 @@ public static void PlayerAccepted(NetworkingPlayer player, NetworkObject[] netwo foreach (NetworkObject obj in networkObjects) obj.currentRpcBufferCounts.Add(player, obj.rpcBuffer.Count); - Task.Queue(() => + BeardedManStudios.Threading.Task.Queue(() => { lock (player) { @@ -832,6 +842,199 @@ public bool RegisterOnce(uint id) return true; } + /// + /// Send a request to the specified receivers. A task is returned that will resolve with the response from the + /// other party. + /// + /// The name of the request to be sent + /// The clients / server to receive the message + /// Data to pass along with the request, this can be any serializable object + public Task SendRequest(string name, Receivers receivers, object data = null) + { + return SendRequest(null, name, receivers, data); + } + + /// + /// Send a request to the specified receivers. A task is returned that will resolve with the response from the + /// other party. + /// + /// The player that is being sent this RPC from the server + /// The name of the request to be sent + /// Data to pass along with the request, this can be any serializable object + public Task SendRequest(NetworkingPlayer targetPlayer, string name, object data = null) + { + return SendRequest(targetPlayer, name, Receivers.Target, data); + } + + /// + /// Send a request to the specified receivers. A task is returned that will resolve with the response from all + /// other parties. + /// + /// This method will not return until all parties have responded to the request. + /// + /// An array of s to send this RPC from the server + /// The name of the request to send + /// Data to pass along with the request, this can be any serializable object + public Task SendRequest(NetworkingPlayer[] targetPlayers, string name, object data = null) + { + var tasks = new List>(); + for (int i = 0; i < targetPlayers.Length; i++) + { + tasks.Add(SendRequest(targetPlayers[i], name, Receivers.Target, data)); + } + return Task.WhenAll(tasks.ToArray()); + } + + /// + /// Send a request to the specified receivers. A task is returned that will resolve with the response from the + /// other party. + /// + /// The target player that should receive the request + /// The name of the request to send + /// The clients / server to receive the message + /// Data to pass along with the request, this can be any serializable object + /// + public Task SendRequest(NetworkingPlayer targetPlayer, string name, Receivers receivers, object data = null) + { + if (!requestHandlerMap.ContainsKey(name)) + throw new BaseNetworkException($"Sending request failed: request name '{name}' was not registered"); + + if (receivers != Receivers.Target && receivers != Receivers.Server && receivers != Receivers.Owner) + { + throw new BaseNetworkException( + $"Requests can only be sent to one receiver currently. If you want to send to multiple " + + "receivers, either send multiple requests or use the SendRequest overload using an array of " + + "networking players, which handles this for you." + ); + } + + var requestId = Interlocked.Increment(ref nextRequestId); + object[] args = new object[] { name, requestId, ObjectMapper.BMSByte(data).byteArr }; + TaskCompletionSource promise = new TaskCompletionSource(); + + pendingRequests.TryAdd(requestId, promise); + + BMSLog.Log($"Sending request '{name}' with ID {requestId} and data '{data}' to receivers '{receivers}'"); + + SendRpc(targetPlayer, "HandleRequest", false, receivers, args); + + return promise.Task.ContinueWith((state) => + { + try + { + return (T)state.Result; + } + catch (InvalidCastException e) + { + throw new BaseNetworkException( + $"You indicated when sending request '{name}' that the response would be of type " + + $"'{typeof(T)}', but the return type was set to a different type when registering the request", + e + ); + } + }); + } + + /// + /// Registers a new request that can be sent to this object. + /// + /// Requests are similar to RPCs. They only differ in that they can send back an arbitrary response and use a + /// TAP-based (Task-based Asynchronous Programming) async flow and are thus more convenient to use in scenarios + /// where you want to follow a HTTP-like or request-like flow. However, they tend to be slower because they + /// format incoming and outgoing data through the .NET binary formatter. + /// + /// A generic object comes in. Cast it to the appropriate type you expect. + /// + /// The name of the request + /// The handler to execute when this request is received + public void RegisterRequest(string name, Func,Task> handler) + { + if (requestHandlerMap.ContainsKey(name)) + throw new BaseNetworkException($"The request '{name}' has already been registered"); + + async void requestHandler(RpcArgs rpcArgs) + { + BMSLog.Log($"Received request '{name}' from {rpcArgs.Info.SendingPlayer.Ip}"); + + var requestId = rpcArgs.GetNext(); + var requestData = rpcArgs.GetNext(); + + BMSByte buffer = new BMSByte(); + buffer.Clone(requestData); + + T1 requestDataObject = ObjectMapper.Instance.Map(buffer); + + BMSLog.Log($"Request ID is {requestId} and data is '{requestDataObject}'"); + + Task responsePromise = handler(new RequestContext(rpcArgs.Info.SendingPlayer, requestDataObject)); + + void sendResponseToSender(object newResponseObject) + { + byte[] responseData = ObjectMapper.BMSByte(newResponseObject).byteArr; + + BMSLog.Log( + $"Sending back response '{newResponseObject}' to request '{name}' with ID {requestId}" + ); + + SendRpc(rpcArgs.Info.SendingPlayer, "HandleResponse", name, requestId, responseData); + }; + + sendResponseToSender(await responsePromise); + } + + void responseHandler(RpcArgs rpcArgs) + { + BMSLog.Log($"Received response for request '{name}' from {rpcArgs.Info.SendingPlayer.Ip}"); + + var requestId = rpcArgs.GetNext(); + var responseData = rpcArgs.GetNext(); + + BMSByte buffer = new BMSByte(); + buffer.Clone(responseData); + + T2 responseObject = ObjectMapper.Instance.Map(buffer); + + BMSLog.Log($"Response is for request ID {requestId} and data is '{responseObject}'"); + + if (!pendingRequests.TryRemove(requestId, out TaskCompletionSource promise)) + { + throw new BaseNetworkException( + $"Received response for request {requestId} ('{name}'), but this request was either " + + "not sent or a response was already received" + ); + } + + promise.TrySetResult(responseObject); + } + + requestHandlerMap.Add(name, (requestHandler, responseHandler)); + } + + private void HandleRequest(RpcArgs args) + { + var name = args.GetNext(); + + if (!requestHandlerMap.ContainsKey(name)) + throw new BaseNetworkException($"Received request '{name}', but it was never registered and is not known"); + + Threading.Task.Queue(() => requestHandlerMap[name].requestHandler(args)); + } + + private void HandleResponse(RpcArgs args) + { + var name = args.GetNext(); + + if (!requestHandlerMap.ContainsKey(name)) + { + throw new BaseNetworkException( + $"Received response for a request '{name}', but it was never registered and could thus never " + + "have been sent" + ); + } + + Threading.Task.Queue(() => requestHandlerMap[name].responseHandler(args)); + } + /// /// This will register a method to this network object as an Rpc /// @@ -1591,7 +1794,7 @@ public void Destroy(int timeInMilliseconds = 0) { if (timeInMilliseconds > 0) { - Task.Queue(() => + BeardedManStudios.Threading.Task.Queue(() => { Destroy(false); }, timeInMilliseconds); diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs new file mode 100644 index 00000000..f4dd898c --- /dev/null +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs @@ -0,0 +1,20 @@ +namespace BeardedManStudios.Forge.Networking +{ + /// + /// Represents the context of a request. + /// + public class RequestContext + { + private readonly NetworkingPlayer sender; + public NetworkingPlayer Sender { get => sender; } + + private readonly T data; + public T Data { get => data; } + + public RequestContext(NetworkingPlayer sender, T data) + { + this.sender = sender; + this.data = data; + } + } +} diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs.meta b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs.meta new file mode 100644 index 00000000..7c92756e --- /dev/null +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: b6fd196035acd094ea456f115a3646dd +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: