This repository has been archived by the owner on Aug 15, 2022. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 305
Implement TAP-based request-response flow on top of RPCs #399
Open
NoTuxNoBux
wants to merge
1
commit into
BeardedManStudios:develop
Choose a base branch
from
NoTuxNoBux:feature/request-response-flow-tap
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
...ity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
namespace BeardedManStudios.Forge.Networking | ||
{ | ||
/// <summary> | ||
/// Represents the context of a request. | ||
/// </summary> | ||
public interface ISerializable | ||
{ | ||
void Serialize(BMSByte buffer); | ||
void Unserialize(BMSByte buffer); | ||
} | ||
} |
11 changes: 11 additions & 0 deletions
11
...ssets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/ISerializable.cs.meta
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,12 @@ | ||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using BeardedManStudios.Forge.Logging; | ||
using BeardedManStudios.Forge.Networking.Frame; | ||
using BeardedManStudios.Source.Forge.Networking; | ||
using BeardedManStudios.Threading; | ||
|
||
namespace BeardedManStudios.Forge.Networking | ||
{ | ||
|
@@ -217,6 +220,11 @@ public abstract class NetworkObject | |
private static List<NetworkObject> pendingCreates = new List<NetworkObject>(); | ||
public static readonly object PendingCreatesLock = new object(); | ||
|
||
private long nextRequestId = 1; | ||
private ConcurrentDictionary<long,TaskCompletionSource<object>> pendingRequests = new ConcurrentDictionary<long,TaskCompletionSource<object>>(); | ||
private Dictionary<string,(Action<RpcArgs> requestHandler,Action<RpcArgs> responseHandler)> requestHandlerMap = | ||
new Dictionary<string,(Action<RpcArgs> requestHandler,Action<RpcArgs> responseHandler)>(); | ||
|
||
public byte[] Metadata { get; private set; } | ||
|
||
/// <summary> | ||
|
@@ -477,6 +485,8 @@ private void CreateNativeRpcs() | |
RegisterRpc("RemoveRpcFromBuffer", RemoveRpcFromBuffer); | ||
RegisterRpc("TakeOwnership", TakeOwnership); | ||
RegisterRpc("AssignOwnership", AssignOwnership, typeof(bool)); | ||
RegisterRpc("HandleRequest", HandleRequest, typeof(string), typeof(long), typeof(byte[])); | ||
RegisterRpc("HandleResponse", HandleResponse, typeof(string), typeof(long), typeof(byte[])); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -651,7 +661,7 @@ public static void PlayerAccepted(NetworkingPlayer player, NetworkObject[] netwo | |
foreach (NetworkObject obj in networkObjects) | ||
obj.currentRpcBufferCounts.Add(player, obj.rpcBuffer.Count); | ||
|
||
Task.Queue(() => | ||
BeardedManStudios.Threading.Task.Queue(() => | ||
{ | ||
lock (player) | ||
{ | ||
|
@@ -832,6 +842,199 @@ public bool RegisterOnce(uint id) | |
return true; | ||
} | ||
|
||
/// <summary> | ||
/// Send a request to the specified receivers. A task is returned that will resolve with the response from the | ||
/// other party. | ||
/// </summary> | ||
/// <param name="name">The name of the request to be sent</param> | ||
/// <param name="receivers">The clients / server to receive the message</param> | ||
/// <param name="data">Data to pass along with the request, this can be any serializable object</param> | ||
public Task<T> SendRequest<T>(string name, Receivers receivers, object data = null) | ||
{ | ||
return SendRequest<T>(null, name, receivers, data); | ||
} | ||
|
||
/// <summary> | ||
/// Send a request to the specified receivers. A task is returned that will resolve with the response from the | ||
/// other party. | ||
/// </summary> | ||
/// <param name="targetPlayer">The player that is being sent this RPC from the server</param> | ||
/// <param name="name">The name of the request to be sent</param> | ||
/// <param name="data">Data to pass along with the request, this can be any serializable object</param> | ||
public Task<T> SendRequest<T>(NetworkingPlayer targetPlayer, string name, object data = null) | ||
{ | ||
return SendRequest<T>(targetPlayer, name, Receivers.Target, data); | ||
} | ||
|
||
/// <summary> | ||
/// Send a request to the specified receivers. A task is returned that will resolve with the response from all | ||
/// other parties. | ||
/// | ||
/// This method will not return until all parties have responded to the request. | ||
/// </summary> | ||
/// <param name="targetPlayers">An array of <see cref="NetworkingPlayer"/>s to send this RPC from the server</param> | ||
/// <param name="name">The name of the request to send</param> | ||
/// <param name="data">Data to pass along with the request, this can be any serializable object</param> | ||
public Task<T[]> SendRequest<T>(NetworkingPlayer[] targetPlayers, string name, object data = null) | ||
{ | ||
var tasks = new List<Task<T>>(); | ||
for (int i = 0; i < targetPlayers.Length; i++) | ||
{ | ||
tasks.Add(SendRequest<T>(targetPlayers[i], name, Receivers.Target, data)); | ||
} | ||
return Task.WhenAll(tasks.ToArray()); | ||
} | ||
|
||
/// <summary> | ||
/// Send a request to the specified receivers. A task is returned that will resolve with the response from the | ||
/// other party. | ||
/// </summary> | ||
/// <param name="targetPlayer">The target player that should receive the request</param> | ||
/// <param name="name">The name of the request to send</param> | ||
/// <param name="receivers">The clients / server to receive the message</param> | ||
/// <param name="data">Data to pass along with the request, this can be any serializable object</param> | ||
/// <returns></returns> | ||
public Task<T> SendRequest<T>(NetworkingPlayer targetPlayer, string name, Receivers receivers, object data = null) | ||
{ | ||
if (!requestHandlerMap.ContainsKey(name)) | ||
throw new BaseNetworkException($"Sending request failed: request name '{name}' was not registered"); | ||
|
||
if (receivers != Receivers.Target && receivers != Receivers.Server && receivers != Receivers.Owner) | ||
{ | ||
throw new BaseNetworkException( | ||
$"Requests can only be sent to one receiver currently. If you want to send to multiple " + | ||
"receivers, either send multiple requests or use the SendRequest overload using an array of " + | ||
"networking players, which handles this for you." | ||
); | ||
} | ||
|
||
var requestId = Interlocked.Increment(ref nextRequestId); | ||
object[] args = new object[] { name, requestId, ObjectMapper.BMSByte(data).byteArr }; | ||
TaskCompletionSource<object> promise = new TaskCompletionSource<object>(); | ||
|
||
pendingRequests.TryAdd(requestId, promise); | ||
|
||
BMSLog.Log($"Sending request '{name}' with ID {requestId} and data '{data}' to receivers '{receivers}'"); | ||
|
||
SendRpc(targetPlayer, "HandleRequest", false, receivers, args); | ||
|
||
return promise.Task.ContinueWith((state) => | ||
{ | ||
try | ||
{ | ||
return (T)state.Result; | ||
} | ||
catch (InvalidCastException e) | ||
{ | ||
throw new BaseNetworkException( | ||
$"You indicated when sending request '{name}' that the response would be of type " + | ||
$"'{typeof(T)}', but the return type was set to a different type when registering the request", | ||
e | ||
); | ||
} | ||
}); | ||
} | ||
|
||
/// <summary> | ||
/// Registers a new request that can be sent to this object. | ||
/// | ||
/// Requests are similar to RPCs. They only differ in that they can send back an arbitrary response and use a | ||
/// TAP-based (Task-based Asynchronous Programming) async flow and are thus more convenient to use in scenarios | ||
/// where you want to follow a HTTP-like or request-like flow. However, they tend to be slower because they | ||
/// format incoming and outgoing data through the .NET binary formatter. | ||
/// | ||
/// A generic object comes in. Cast it to the appropriate type you expect. | ||
/// </summary> | ||
/// <param name="name">The name of the request</param> | ||
/// <param name="handler">The handler to execute when this request is received</param> | ||
public void RegisterRequest<T1,T2>(string name, Func<RequestContext<T1>,Task<T2>> handler) | ||
{ | ||
if (requestHandlerMap.ContainsKey(name)) | ||
throw new BaseNetworkException($"The request '{name}' has already been registered"); | ||
|
||
async void requestHandler(RpcArgs rpcArgs) | ||
{ | ||
BMSLog.Log($"Received request '{name}' from {rpcArgs.Info.SendingPlayer.Ip}"); | ||
|
||
var requestId = rpcArgs.GetNext<long>(); | ||
var requestData = rpcArgs.GetNext<byte[]>(); | ||
|
||
BMSByte buffer = new BMSByte(); | ||
buffer.Clone(requestData); | ||
|
||
T1 requestDataObject = ObjectMapper.Instance.Map<T1>(buffer); | ||
|
||
BMSLog.Log($"Request ID is {requestId} and data is '{requestDataObject}'"); | ||
|
||
Task<T2> responsePromise = handler(new RequestContext<T1>(rpcArgs.Info.SendingPlayer, requestDataObject)); | ||
|
||
void sendResponseToSender(object newResponseObject) | ||
{ | ||
byte[] responseData = ObjectMapper.BMSByte(newResponseObject).byteArr; | ||
|
||
BMSLog.Log( | ||
$"Sending back response '{newResponseObject}' to request '{name}' with ID {requestId}" | ||
); | ||
|
||
SendRpc(rpcArgs.Info.SendingPlayer, "HandleResponse", name, requestId, responseData); | ||
}; | ||
|
||
sendResponseToSender(await responsePromise); | ||
} | ||
|
||
void responseHandler(RpcArgs rpcArgs) | ||
{ | ||
BMSLog.Log($"Received response for request '{name}' from {rpcArgs.Info.SendingPlayer.Ip}"); | ||
|
||
var requestId = rpcArgs.GetNext<long>(); | ||
var responseData = rpcArgs.GetNext<byte[]>(); | ||
|
||
BMSByte buffer = new BMSByte(); | ||
buffer.Clone(responseData); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||
|
||
T2 responseObject = ObjectMapper.Instance.Map<T2>(buffer); | ||
|
||
BMSLog.Log($"Response is for request ID {requestId} and data is '{responseObject}'"); | ||
|
||
if (!pendingRequests.TryRemove(requestId, out TaskCompletionSource<object> promise)) | ||
{ | ||
throw new BaseNetworkException( | ||
$"Received response for request {requestId} ('{name}'), but this request was either " + | ||
"not sent or a response was already received" | ||
); | ||
} | ||
|
||
promise.TrySetResult(responseObject); | ||
} | ||
|
||
requestHandlerMap.Add(name, (requestHandler, responseHandler)); | ||
} | ||
|
||
private void HandleRequest(RpcArgs args) | ||
{ | ||
var name = args.GetNext<string>(); | ||
|
||
if (!requestHandlerMap.ContainsKey(name)) | ||
throw new BaseNetworkException($"Received request '{name}', but it was never registered and is not known"); | ||
|
||
Threading.Task.Queue(() => requestHandlerMap[name].requestHandler(args)); | ||
} | ||
|
||
private void HandleResponse(RpcArgs args) | ||
{ | ||
var name = args.GetNext<string>(); | ||
|
||
if (!requestHandlerMap.ContainsKey(name)) | ||
{ | ||
throw new BaseNetworkException( | ||
$"Received response for a request '{name}', but it was never registered and could thus never " + | ||
"have been sent" | ||
); | ||
} | ||
|
||
Threading.Task.Queue(() => requestHandlerMap[name].responseHandler(args)); | ||
} | ||
|
||
/// <summary> | ||
/// This will register a method to this network object as an Rpc | ||
/// </summary> | ||
|
@@ -1591,7 +1794,7 @@ public void Destroy(int timeInMilliseconds = 0) | |
{ | ||
if (timeInMilliseconds > 0) | ||
{ | ||
Task.Queue(() => | ||
BeardedManStudios.Threading.Task.Queue(() => | ||
{ | ||
Destroy(false); | ||
}, timeInMilliseconds); | ||
|
20 changes: 20 additions & 0 deletions
20
...ty/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/Objects/RequestContext.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
namespace BeardedManStudios.Forge.Networking | ||
{ | ||
/// <summary> | ||
/// Represents the context of a request. | ||
/// </summary> | ||
public class RequestContext<T> | ||
{ | ||
private readonly NetworkingPlayer sender; | ||
public NetworkingPlayer Sender { get => sender; } | ||
|
||
private readonly T data; | ||
public T Data { get => data; } | ||
|
||
public RequestContext(NetworkingPlayer sender, T data) | ||
{ | ||
this.sender = sender; | ||
this.data = data; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not be optimal as it copies data unnecessarily: I haven't checked if there is a way to create a
BMSByte
from an existingbyte[]
without copying.