From 7cdc839ec7104f100ef09195c8f390f16a1a55cc Mon Sep 17 00:00:00 2001 From: Mohit Tejani Date: Tue, 4 Jun 2024 11:41:48 +0530 Subject: [PATCH] added entity implementation --- .../EndPoint/PubSub/SubscribeEndpoint.cs | 4 +- .../EndPoint/PubSub/UnsubscribeEndpoint.cs | 72 +++- src/Api/PubnubApi/Entity/Channel.cs | 30 +- src/Api/PubnubApi/Entity/ChannelGroup.cs | 4 +- src/Api/PubnubApi/Entity/SubscribeCapable.cs | 76 ++++ src/Api/PubnubApi/Entity/Subscription.cs | 28 +- src/Api/PubnubApi/Entity/SubscriptionSet.cs | 87 +++- .../EventEngine/Common/EventEmitter.cs | 95 ++++- .../Subscribe/SubscribeEventEngine.cs | 46 +- .../Derived/Pubsub/SubscribeCallbackExt.cs | 393 ++++++++++-------- src/Api/PubnubApi/Pubnub.cs | 14 +- src/Api/PubnubApiPCL/PubnubApiPCL.csproj | 1 + src/Api/PubnubApiUnity/PubnubApiUnity.csproj | 6 + 13 files changed, 588 insertions(+), 268 deletions(-) create mode 100644 src/Api/PubnubApi/Entity/SubscribeCapable.cs diff --git a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs index 4142c946d..c0f913be4 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs @@ -30,6 +30,7 @@ public class SubscribeEndpoint: ISubscribeOperation private SubscribeEventEngineFactory subscribeEventEngineFactory; private PresenceOperation presenceOperation; private string instanceId { get; set; } + public EventEmitter EventEmitter { get; set; } public List SubscribeListenerList { get; @@ -126,8 +127,7 @@ private void Subscribe(string[] channels, string[] channelGroups, SubscriptionCu subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); } else { var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance); - var eventEmitter = new EventEmitter(config, SubscribeListenerList, jsonLibrary, pubnubTokenMgr, pubnubLog, PubnubInstance); - subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, eventEmitter, jsonLibrary, StatusEmitter); + subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, this.EventEmitter, jsonLibrary, StatusEmitter); subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition; subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued; subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch; diff --git a/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs b/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs index 4417ed2a3..71e6a5440 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/UnsubscribeEndpoint.cs @@ -5,8 +5,6 @@ using System.Collections.Generic; using System.Globalization; using System.Linq; -using System.Text; -using System.Threading; namespace PubnubApi.EndPoint { @@ -78,28 +76,62 @@ private void Unsubscribe(string[] channels, string[] channelGroups) if (this.subscribeEventEngineFactory.HasEventEngine(instanceId)) { subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); - subscribeEventEngine.Unsubscribe(channels, channelGroups); - } else { - LoggingMethod.WriteToLog(pubnubLog, string.Format(CultureInfo.InvariantCulture, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, Attempted Unsubscribe without EventEngine subscribe."), config.LogVerbosity); - } - if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId)) { - PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId); - presenceEventEngine.EventQueue.Enqueue(new EventEngine.Presence.Events.LeftEvent() { Input = new EventEngine.Presence.Common.PresenceInput() { Channels = channels, ChannelGroups = channelGroups } }); - } - if (config.MaintainPresenceState) { - if (ChannelLocalUserState.TryGetValue(PubnubInstance.InstanceId, out - var userState)) { - foreach (var channelName in channels ?? new string[0]) { - userState.TryRemove(channelName, out _); - } + channels = channels ?? new string[] { }; + channelGroups = channelGroups ?? new string[] { }; + var channelsWithPresence = channels.Concat(channels.Select((c) => $"{c}-pnpres")).ToList(); + var filteredChannelNames = new List(subscribeEventEngine.Channels); + foreach (var c in channelsWithPresence) { + filteredChannelNames.Remove(c); } - if (ChannelGroupLocalUserState.TryGetValue(PubnubInstance.InstanceId, out - var channelGroupUserState)) { - foreach (var channelGroupName in channelGroups ?? new string[0]) { - channelGroupUserState.TryRemove(channelGroupName, out _); + var channelGroupsWithPresence = channelGroups.Concat(channelGroups.Select((cg) => $"{cg}-pnpres")).ToList(); + var filteredChannelGroupNames = new List(subscribeEventEngine.ChannelGroups); + foreach (var g in channelGroupsWithPresence) { + filteredChannelGroupNames.Remove(g); + } + if (subscribeEventEngine.Channels.Distinct().Count() != filteredChannelNames.Distinct().Count() || + subscribeEventEngine.ChannelGroups.Distinct().Count() != filteredChannelGroupNames.Distinct().Count()) { + + var channelsToRemove = FindUniqueCommonElements(subscribeEventEngine.Channels, channels.ToList()); + var channelGroupsToRemove = FindUniqueCommonElements(subscribeEventEngine.ChannelGroups, channelGroups.ToList()); + + subscribeEventEngine.Unsubscribe(channelsToRemove.ToArray(), channelGroupsToRemove.ToArray()); + + if (config.PresenceInterval > 0 && presenceEventEngineFactory.HasEventEngine(instanceId)) { + PresenceEventEngine presenceEventEngine = presenceEventEngineFactory.GetEventEngine(instanceId); + presenceEventEngine.EventQueue.Enqueue(new EventEngine.Presence.Events.LeftEvent() { Input = new EventEngine.Presence.Common.PresenceInput() { Channels = channelsToRemove.ToArray(), ChannelGroups = channelGroupsToRemove.ToArray() } }); } + if (config.MaintainPresenceState) { + if (ChannelLocalUserState.TryGetValue(PubnubInstance.InstanceId, out + var userState)) { + foreach (var channelName in channels ?? new string[0]) { + userState.TryRemove(channelName, out _); + } + } + if (ChannelGroupLocalUserState.TryGetValue(PubnubInstance.InstanceId, out + var channelGroupUserState)) { + foreach (var channelGroupName in channelGroups ?? new string[0]) { + channelGroupUserState.TryRemove(channelGroupName, out _); + } + } + } + } else { + subscribeEventEngine.Channels = filteredChannelNames; + subscribeEventEngine.ChannelGroups = filteredChannelGroupNames; } + + } else { + LoggingMethod.WriteToLog(pubnubLog, string.Format(CultureInfo.InvariantCulture, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, Attempted Unsubscribe without EventEngine subscribe."), config.LogVerbosity); } + + } + + private IEnumerable FindUniqueCommonElements(List a, List b) + { + return a + .Where(value => + b.Contains(value) && + a.IndexOf(value) == a.LastIndexOf(value) && + b.IndexOf(value) == b.LastIndexOf(value)); } } } diff --git a/src/Api/PubnubApi/Entity/Channel.cs b/src/Api/PubnubApi/Entity/Channel.cs index e5374c6d8..c977817d3 100644 --- a/src/Api/PubnubApi/Entity/Channel.cs +++ b/src/Api/PubnubApi/Entity/Channel.cs @@ -2,22 +2,24 @@ using System.Linq; using PubnubApi.EventEngine.Common; -namespace PubnubApi { -public class Channel +namespace PubnubApi { - public string Name { get; set; } - private Pubnub Pubnub { get; set; } - private EventEmitter EventEmitter { get; set; } - public Channel(string name, Pubnub pubnub, EventEmitter eventEmitter) + public class Channel { - Name = name; - this.Pubnub = pubnub; - this.EventEmitter = eventEmitter; - } + public string Name { get; set; } + private Pubnub Pubnub { get; set; } + private EventEmitter EventEmitter { get; set; } - Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None) - { - return new Subscription(this.Name, options, this.Pubnub, this.EventEmitter); + public Channel(string name, Pubnub pubnub, EventEmitter eventEmitter) + { + Name = name; + this.Pubnub = pubnub; + this.EventEmitter = eventEmitter; + } + + public Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None) + { + return new Subscription(options == SubscriptionOptions.ReceivePresenceEvents ? new string[] { Name, $"{Name}-pnpres" } : new string[] { Name }, new string[] { }, options, this.Pubnub, this.EventEmitter); + } } -} } \ No newline at end of file diff --git a/src/Api/PubnubApi/Entity/ChannelGroup.cs b/src/Api/PubnubApi/Entity/ChannelGroup.cs index 364227651..c0c799374 100644 --- a/src/Api/PubnubApi/Entity/ChannelGroup.cs +++ b/src/Api/PubnubApi/Entity/ChannelGroup.cs @@ -16,9 +16,9 @@ public ChannelGroup(string name, Pubnub pubnub, EventEmitter eventEmitter) this.EventEmitter = eventEmitter; } - Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None) + public Subscription Subscription(SubscriptionOptions options = SubscriptionOptions.None) { - return new Subscription(this.Name, options, this.Pubnub, this.EventEmitter); + return new Subscription(new string[] { }, options == SubscriptionOptions.ReceivePresenceEvents ? new string[] { Name, $"{Name}-pnpres" } : new string[] { Name }, options, this.Pubnub, this.EventEmitter); } } } \ No newline at end of file diff --git a/src/Api/PubnubApi/Entity/SubscribeCapable.cs b/src/Api/PubnubApi/Entity/SubscribeCapable.cs new file mode 100644 index 000000000..0c71b43da --- /dev/null +++ b/src/Api/PubnubApi/Entity/SubscribeCapable.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Generic; +using PubnubApi.EventEngine.Common; +using PubnubApi.EventEngine.Subscribe.Common; + +namespace PubnubApi +{ + public abstract class SubscribeCapable + { + public abstract List ChannelNames { get; set; } + public abstract List ChannelGroupNames { get; set; } + public abstract Pubnub Pubnub { get; set; } + public abstract EventEmitter EventEmitter { get; set; } + public abstract SubscriptionOptions Options { get; set; } + public abstract SubscribeCallbackExt Listener { get; set; } + + public Action OnPresence { + set { + Listener.presenceAction = value; + } + } + public Action OnObjects { + set { + Listener.objectEventAction = value; + } + } + public Action OnFile { + set { + Listener.fileAction = value; + } + } + public Action OnMessageAction { + set { + Listener.messageAction = value; + } + } + public Action> OnMessage { + set { + Listener.subscribeAction = value; + } + } + public Action> OnSignal { + set { + Listener.signalAction = value; + } + } + + public void Subscribe(SubscriptionCursor cursor = null) + { + var subscription = this.Pubnub.Subscribe().Channels(this.ChannelNames.ToArray()).ChannelGroups(this.ChannelGroupNames.ToArray()); + if (cursor is not null && cursor.Timetoken != 0) { + var timetoken = cursor.Timetoken ?? 0; + subscription.WithTimetoken(timetoken).Execute(); + + } else { + subscription.Execute(); + } + } + + public void AddListener(SubscribeCallbackExt listener) + { + this.EventEmitter.AddListener(listener, this.ChannelNames.ToArray(), this.ChannelGroupNames.ToArray()); + } + + public void RemoveListener(SubscribeCallbackExt listener) + { + this.EventEmitter.RemoveListener(listener, this.ChannelNames.ToArray(), this.ChannelGroupNames.ToArray()); + } + + public void UnSubscribe() + { + this.Pubnub.Unsubscribe().Channels(ChannelNames.ToArray()).ChannelGroups(ChannelGroupNames.ToArray()).Execute(); + } + } +} + diff --git a/src/Api/PubnubApi/Entity/Subscription.cs b/src/Api/PubnubApi/Entity/Subscription.cs index 458562cc8..114f5139d 100644 --- a/src/Api/PubnubApi/Entity/Subscription.cs +++ b/src/Api/PubnubApi/Entity/Subscription.cs @@ -1,28 +1,34 @@ using System; +using System.Collections.Generic; using System.Linq; using PubnubApi.EventEngine.Common; namespace PubnubApi { - public class Subscription + public class Subscription : SubscribeCapable { - public string[] Names { get; set; } = new string[] { }; - private Pubnub Pubnub { get; set; } - private EventEmitter EventEmitter { get; set; } + public override List ChannelNames { get; set; } = new List(); + public override List ChannelGroupNames { get; set; } = new List(); + public override Pubnub Pubnub { get; set; } + public override EventEmitter EventEmitter { get; set; } + public override SubscriptionOptions Options { get; set; } + public override SubscribeCallbackExt Listener { get; set; } = new SubscribeCallbackExt(); - public Subscription(string name, SubscriptionOptions options, Pubnub pubnub, EventEmitter eventEmitter) + public Subscription(string[] channels, string[] channelGroups, SubscriptionOptions options, Pubnub pubnub, EventEmitter eventEmitter) { - Names.ToList().Add(name); - if (options == SubscriptionOptions.ReceivePresenceEvents) { - Names.ToList().Add($"{name}-pnpres"); - } + this.ChannelNames = channels.ToList(); + this.ChannelGroupNames = channelGroups.ToList(); this.Pubnub = pubnub; this.EventEmitter = eventEmitter; + this.Options = options; + this.EventEmitter.AddListener(Listener, channels, channelGroups); } - SubscriptionSet AddSubscription(Subscription subscription) + public SubscriptionSet Add(Subscription subscription) { - return new SubscriptionSet(); + this.ChannelNames.AddRange(subscription.ChannelNames); + this.ChannelGroupNames.AddRange(subscription.ChannelGroupNames); + return new SubscriptionSet(this.ChannelNames.ToArray(),this.ChannelGroupNames.ToArray() , this.Options, this.Pubnub, this.EventEmitter); } } } \ No newline at end of file diff --git a/src/Api/PubnubApi/Entity/SubscriptionSet.cs b/src/Api/PubnubApi/Entity/SubscriptionSet.cs index 0f3d501a0..2c50843fd 100644 --- a/src/Api/PubnubApi/Entity/SubscriptionSet.cs +++ b/src/Api/PubnubApi/Entity/SubscriptionSet.cs @@ -1,26 +1,99 @@ using System; using System.Linq; +using System.Collections.Generic; using PubnubApi.EventEngine.Common; namespace PubnubApi { - class SubscriptionSet + public class SubscriptionSet : SubscribeCapable { - public SubscriptionSet() + public override List ChannelNames { get; set; } = new List(); + public override List ChannelGroupNames { get; set; } = new List(); + public override Pubnub Pubnub { get; set; } + public override EventEmitter EventEmitter { get; set; } + public override SubscriptionOptions Options { get; set; } + List SubscriptionList { get; set; } = new List(); + public override SubscribeCallbackExt Listener { get; set; } = new SubscribeCallbackExt(); + + public SubscriptionSet(string[] channels, string[] channelGroups, SubscriptionOptions options, Pubnub pubnub, EventEmitter eventEmitter) + { + this.Pubnub = pubnub; + this.EventEmitter = eventEmitter; + this.Options = options; + + foreach (var c in channels + .Where(c => !c.EndsWith("-pnpres"))) { + var subscription = this.Pubnub.Channel(c).Subscription(this.Options); + this.ChannelNames.AddRange(subscription.ChannelNames); + this.SubscriptionList.Add(subscription); + } + + foreach (var cg in channelGroups + .Where(cg => !cg.EndsWith("-pnpres"))) { + var subscription = this.Pubnub.ChannelGroup(cg).Subscription(this.Options); + this.ChannelGroupNames.AddRange(subscription.ChannelGroupNames); + this.SubscriptionList.Add(subscription); + } + } + + public SubscriptionSet Add(Subscription subscription) { - // Ctor with Channels and channelGroups + this.SubscriptionList.ToList().Add(subscription); + if (subscription.ChannelNames.Count > 0) { + this.ChannelNames.AddRange(subscription.ChannelNames); + } + if (subscription.ChannelGroupNames.Count > 0) { + this.ChannelGroupNames.AddRange(subscription.ChannelGroupNames); + } + this.EventEmitter.AddListener(this.Listener, subscription.ChannelNames.ToArray(), subscription.ChannelGroupNames.ToArray()); + return this; + } + + public SubscriptionSet Remove(Subscription subscription) + { + this.SubscriptionList.Remove(subscription); + if (subscription.ChannelNames.Count > 0) { + foreach (var c in subscription.ChannelNames) { + this.ChannelNames.Remove(c); + } + } + if (subscription.ChannelGroupNames.Count > 0) { + foreach (var g in subscription.ChannelGroupNames) { + this.ChannelGroupNames.Remove(g); + } + } + this.EventEmitter.RemoveListener(this.Listener, subscription.ChannelNames.ToArray(), subscription.ChannelGroupNames.ToArray()); + return this; } - SubscriptionSet AddSubscription(Subscription subscription) + public SubscriptionSet Add(SubscriptionSet subscriptionSet) { - // CRUD + this.SubscriptionList.AddRange(subscriptionSet.SubscriptionList); + if (subscriptionSet.ChannelNames.Count > 0) { + this.ChannelNames.AddRange(subscriptionSet.ChannelNames); + } + if (subscriptionSet.ChannelGroupNames.Count > 0) { + this.ChannelGroupNames.AddRange(subscriptionSet.ChannelGroupNames); + } + this.EventEmitter.AddListener(this.Listener, subscriptionSet.ChannelNames.ToArray(), subscriptionSet.ChannelGroupNames.ToArray()); return this; } - SubscriptionSet RemoveSubscription(Subscription subscription) + public SubscriptionSet Remove(SubscriptionSet subscriptionSet) { - // CRUD + SubscriptionList = this.SubscriptionList.Except(subscriptionSet.SubscriptionList).ToList(); + if (subscriptionSet.ChannelNames.Count > 0) { + foreach (var c in subscriptionSet.ChannelNames) { + this.ChannelNames.Remove(c); + } + } + if (subscriptionSet.ChannelGroupNames.Count > 0) { + foreach (var g in subscriptionSet.ChannelGroupNames) { + this.ChannelGroupNames.Remove(g); + } + } + this.EventEmitter.RemoveListener(this.Listener, subscriptionSet.ChannelNames.ToArray(), subscriptionSet.ChannelGroupNames.ToArray()); return this; } } diff --git a/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs b/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs index b78bc6d9a..b38f033bb 100644 --- a/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs +++ b/src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Globalization; using PubnubApi.EndPoint; using PubnubApi.EventEngine.Subscribe.Common; @@ -16,6 +17,8 @@ public class EventEmitter private IJsonPluggableLibrary jsonLibrary; private Pubnub instance; private TokenManager tokenManager; + private Dictionary> channelListenersMap; + private Dictionary> channelGroupListenersMap; public EventEmitter(PNConfiguration configuration, List listenerCallbacks, IJsonPluggableLibrary jsonPluggableLibrary, TokenManager tokenManager, IPubnubLog log, Pubnub instance) { @@ -25,6 +28,8 @@ public EventEmitter(PNConfiguration configuration, List liste this.tokenManager = tokenManager; jsonLibrary = jsonPluggableLibrary; listeners = listenerCallbacks; + channelGroupListenersMap = new Dictionary>(); + channelListenersMap = new Dictionary>(); } private TimetokenMetadata GetTimetokenMetadata(object t) @@ -47,6 +52,39 @@ private TimetokenMetadata GetTimetokenMetadata(object t) return null; } + public void AddListener(SubscribeCallback listener, string[] channels, string[] groups) + { + foreach (var c in channels.Where(c => !c.EndsWith("-pnpres"))) { + if (channelListenersMap.ContainsKey(c)) { + channelListenersMap[c].Add(listener); + } else { + channelListenersMap[c] = new List { listener }; + } + } + + foreach (var cg in groups.Where(cg => !cg.EndsWith("-pnpres"))) { + if (channelGroupListenersMap.ContainsKey(cg)) { + channelGroupListenersMap[cg].Add(listener); + } else { + channelGroupListenersMap[cg] = new List { listener }; + } + } + } + + public void RemoveListener(SubscribeCallback listener, string[] channels, string[] groups) + { + foreach (var c in channels.Where(c => !c.EndsWith("-pnpres"))) { + if (channelListenersMap.ContainsKey(c)) { + channelListenersMap[c].Remove(listener); + } + } + foreach (var cg in groups.Where(cg => !cg.EndsWith("-pnpres"))) { + if (channelGroupListenersMap.ContainsKey(cg)) { + channelGroupListenersMap[cg].Remove(listener); + } + } + } + public void EmitEvent(object e) { Message eventData = e as Message; @@ -140,6 +178,16 @@ public void EmitEvent(object e) foreach (var listener in listeners) { listener?.Signal(instance, signalMessage); } + if (!string.IsNullOrEmpty(signalMessage.Channel) && channelListenersMap.ContainsKey(signalMessage.Channel)) { + foreach (var l in channelListenersMap[signalMessage.Channel]) { + l?.Signal(instance, signalMessage); + } + } + if (!string.IsNullOrEmpty(signalMessage.Subscription) && channelListenersMap.ContainsKey(signalMessage.Subscription)) { + foreach (var l in channelGroupListenersMap[signalMessage.Subscription]) { + l?.Signal(instance, signalMessage); + } + } } } else if (eventData.MessageType == 2) { ResponseBuilder responseBuilder = new ResponseBuilder(configuration, jsonLibrary, log); @@ -148,6 +196,11 @@ public void EmitEvent(object e) foreach (var listener in listeners) { listener?.ObjectEvent(instance, objectApiEvent); } + if (!string.IsNullOrEmpty(objectApiEvent.Channel) && channelListenersMap.ContainsKey(objectApiEvent.Channel)) { + foreach (var l in channelListenersMap[objectApiEvent.Channel]) { + l?.ObjectEvent(instance, objectApiEvent); + } + } } } else if (eventData.MessageType == 3) { ResponseBuilder responseBuilder = new ResponseBuilder(configuration, jsonLibrary, log); @@ -156,6 +209,11 @@ public void EmitEvent(object e) foreach (var listener in listeners) { listener?.MessageAction(instance, messageActionEvent); } + if (!string.IsNullOrEmpty(messageActionEvent.Channel) && channelListenersMap.ContainsKey(messageActionEvent.Channel)) { + foreach (var l in channelListenersMap[messageActionEvent.Channel]) { + l?.MessageAction(instance, messageActionEvent); + } + } } } else if (eventData.MessageType == 4) { ResponseBuilder responseBuilder = new ResponseBuilder(configuration, jsonLibrary, log); @@ -190,24 +248,53 @@ public void EmitEvent(object e) foreach (var listener in listeners) { listener?.File(instance, fileMessage); } + if (!string.IsNullOrEmpty(fileMessage.Channel) && channelListenersMap.ContainsKey(fileMessage.Channel)) { + foreach (var l in channelListenersMap[fileMessage.Channel]) { + l?.File(instance, fileMessage); + } + } + if (!string.IsNullOrEmpty(fileMessage.Subscription) && channelListenersMap.ContainsKey(fileMessage.Subscription)) { + foreach (var l in channelGroupListenersMap[fileMessage.Subscription]) { + l?.File(instance, fileMessage); + } + } } - } - else if (currentMessageChannel.Contains("-pnpres")) { + } else if (currentMessageChannel.Contains("-pnpres")) { ResponseBuilder responseBuilder = new ResponseBuilder(configuration, jsonLibrary, log); PNPresenceEventResult presenceEvent = responseBuilder.JsonToObject(payloadContainer, true); if (presenceEvent != null) { foreach (var listener in listeners) { listener?.Presence(instance, presenceEvent); } + if (!string.IsNullOrEmpty(presenceEvent.Channel) && channelListenersMap.ContainsKey(presenceEvent.Channel)) { + foreach (var l in channelListenersMap[presenceEvent.Channel]) { + l?.Presence(instance, presenceEvent); + } + } + if (!string.IsNullOrEmpty(presenceEvent.Subscription) && channelListenersMap.ContainsKey(presenceEvent.Subscription)) { + foreach (var l in channelGroupListenersMap[presenceEvent.Subscription]) { + l?.Presence(instance, presenceEvent); + } + } } - } - else { + + } else { ResponseBuilder responseBuilder = new ResponseBuilder(configuration, jsonLibrary, log); PNMessageResult message = responseBuilder.JsonToObject>(payloadContainer, true); if (message != null) { foreach (var listener in listeners) { listener?.Message(instance, message); } + if (!string.IsNullOrEmpty(message.Channel) && channelListenersMap.ContainsKey(message.Channel)) { + foreach (var l in channelListenersMap[message.Channel]) { + l?.Message(instance, message); + } + } + if (!string.IsNullOrEmpty(message.Subscription) && channelListenersMap.ContainsKey(message.Subscription)) { + foreach (var l in channelGroupListenersMap[message.Subscription]) { + l?.Message(instance, message); + } + } } } } diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs index 5b49f0039..6f086a0b4 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs @@ -17,15 +17,15 @@ public class SubscribeEventEngine : Engine private readonly Dictionary channelGroupTypeMap = new Dictionary(); private readonly IJsonPluggableLibrary jsonPluggableLibrary; - public string[] Channels { get; set; } = new string[] {}; - public string[] Channelgroups { get; set; } = new string[] {}; + public List Channels { get; set; } = new List(); + public List ChannelGroups { get; set; } = new List(); internal SubscribeEventEngine(Pubnub pubnubInstance, PNConfiguration pubnubConfiguration, SubscribeManager2 subscribeManager, - EventEmitter eventEmitter, + EventEmitter eventEmitter, IJsonPluggableLibrary jsonPluggableLibrary, - Action statusListener = null) + Action statusListener = null) { this.subscribeManager = subscribeManager; this.jsonPluggableLibrary = jsonPluggableLibrary; @@ -55,27 +55,25 @@ internal SubscribeEventEngine(Pubnub pubnubInstance, } public void Subscribe(string[] channels, string[] channelGroups, SubscriptionCursor cursor) { - foreach (var c in channels) - { + Channels.AddRange(channels); + ChannelGroups.AddRange(channelGroups); + + foreach (var c in channels) { channelTypeMap[c] = typeof(T); } - foreach (var c in channelGroups) - { + foreach (var c in channelGroups) { channelGroupTypeMap[c] = typeof(T); } - if (cursor != null) - { + if (cursor != null) { EventQueue.Enqueue(new SubscriptionRestoredEvent() { - Channels = Channels.Concat(channels ?? new string[] {}), - ChannelGroups = Channelgroups.Concat(channelGroups ?? new string[] {}), + Channels = Channels.Concat(channels ?? new string[] { }).Distinct(), + ChannelGroups = ChannelGroups.Concat(channelGroups ?? new string[] { }).Distinct(), Cursor = cursor }); - } - else - { + } else { EventQueue.Enqueue(new SubscriptionChangedEvent() { - Channels = Channels.Concat(channels??new string[] {}), - ChannelGroups = Channelgroups.Concat(channelGroups?? new string[] {}) + Channels = Channels.Concat(channels ?? new string[] { }).Distinct(), + ChannelGroups = ChannelGroups.Concat(channelGroups ?? new string[] { }).Distinct() }); } } @@ -84,7 +82,7 @@ public void Subscribe(string[] channels, string[] channelGroups, SubscriptionCur { Subscribe(channels, channelGroups, cursor); } - + public void UnsubscribeAll() { EventQueue.Enqueue(new UnsubscribeAllEvent()); @@ -92,17 +90,9 @@ public void UnsubscribeAll() public void Unsubscribe(string[] channels, string[] channelGroups) { - var channelNames = new List(); - var groupNames = new List(); - foreach (var channel in channels?? Enumerable.Empty()) { - channelNames.Add($"{channel}-pnpres"); - } - foreach (var cg in channelGroups ?? Enumerable.Empty()) { - groupNames.Add($"{cg}-pnpres"); - } this.EventQueue.Enqueue(new SubscriptionChangedEvent() { - Channels = (this.currentState as SubscriptionState).Channels?.Except(channels?.Union(channelNames)??Enumerable.Empty()), - ChannelGroups = (this.currentState as SubscriptionState).ChannelGroups?.Except(channelGroups?.Union(groupNames)??Enumerable.Empty()) + Channels = this.Channels?.Except(channels ?? Enumerable.Empty()), + ChannelGroups = this.ChannelGroups?.Except(channelGroups ?? Enumerable.Empty()) }); } } diff --git a/src/Api/PubnubApi/Model/Derived/Pubsub/SubscribeCallbackExt.cs b/src/Api/PubnubApi/Model/Derived/Pubsub/SubscribeCallbackExt.cs index 5817f6193..4fa316227 100644 --- a/src/Api/PubnubApi/Model/Derived/Pubsub/SubscribeCallbackExt.cs +++ b/src/Api/PubnubApi/Model/Derived/Pubsub/SubscribeCallbackExt.cs @@ -7,181 +7,220 @@ namespace PubnubApi { - public class SubscribeCallbackExt : SubscribeCallback - { - readonly Action> subscribeAction; - readonly Action presenceAction; - readonly Action> signalAction; - readonly Action statusAction; - readonly Action objectEventAction; - readonly Action messageAction; - readonly Action fileAction; - - public SubscribeCallbackExt(Action> messageCallback, Action presenceCallback, Action statusCallback) - { - subscribeAction = messageCallback; - presenceAction = presenceCallback; - statusAction = statusCallback; - signalAction = null; - objectEventAction = null; - fileAction = null; - } - - public SubscribeCallbackExt(Action> signalCallback, Action statusCallback) - { - subscribeAction = null; - presenceAction = null; - statusAction = statusCallback; - signalAction = signalCallback; - objectEventAction = null; - fileAction = null; - } - - public SubscribeCallbackExt(Action objectEventCallback, Action statusCallback) - { - subscribeAction = null; - presenceAction = null; - signalAction = null; - statusAction = statusCallback; - objectEventAction = objectEventCallback; - fileAction = null; - } - - public SubscribeCallbackExt(Action messageActionCallback, Action statusCallback) - { - subscribeAction = null; - presenceAction = null; - signalAction = null; - statusAction = statusCallback; - objectEventAction = null; - messageAction = messageActionCallback; - fileAction = null; - } - - public SubscribeCallbackExt(Action fileCallback, Action statusCallback) - { - subscribeAction = null; - presenceAction = null; - statusAction = statusCallback; - signalAction = null; - objectEventAction = null; - fileAction = fileCallback; - } - - public SubscribeCallbackExt(Action> messageCallback, - Action presenceCallback, - Action> signalCallback, - Action statusCallback) - { - subscribeAction = messageCallback; - presenceAction = presenceCallback; - statusAction = statusCallback; - signalAction = signalCallback; - objectEventAction = null; - } - - public SubscribeCallbackExt(Action> messageCallback, - Action presenceCallback, - Action> signalCallback, - Action objectEventCallback, - Action statusCallback) - { - subscribeAction = messageCallback; - presenceAction = presenceCallback; - statusAction = statusCallback; - signalAction = signalCallback; - objectEventAction = objectEventCallback; - } - - public SubscribeCallbackExt(Action> messageCallback, - Action presenceCallback, - Action> signalCallback, - Action objectEventCallback, - Action messageActionCallback, - Action statusCallback) - { - subscribeAction = messageCallback; - presenceAction = presenceCallback; - statusAction = statusCallback; - signalAction = signalCallback; - objectEventAction = objectEventCallback; - messageAction = messageActionCallback; - } - - public SubscribeCallbackExt(Action> messageCallback, - Action presenceCallback, - Action> signalCallback, - Action objectEventCallback, - Action messageActionCallback, - Action fileCallback, - Action statusCallback) - { - subscribeAction = messageCallback; - presenceAction = presenceCallback; - statusAction = statusCallback; - signalAction = signalCallback; - objectEventAction = objectEventCallback; - messageAction = messageActionCallback; - fileAction = fileCallback; - } - - public override void Message(Pubnub pubnub, PNMessageResult message) - { - PNMessageResult message1 = new PNMessageResult(); - message1.Channel = message.Channel; - message1.Message = (T)(object)message.Message; - message1.Subscription = message.Subscription; - message1.Timetoken = message.Timetoken; - message1.UserMetadata = message.UserMetadata; - message1.Publisher = message.Publisher; - - subscribeAction?.Invoke(pubnub, message1); - } - - public override void Presence(Pubnub pubnub, PNPresenceEventResult presence) - { - presenceAction?.Invoke(pubnub, presence); - } - - public override void Status(Pubnub pubnub, PNStatus status) - { - statusAction?.Invoke(pubnub, status); - } - - public override void Signal(Pubnub pubnub, PNSignalResult signalMessage) - { - PNSignalResult message1 = new PNSignalResult(); - message1.Channel = signalMessage.Channel; - message1.Message = (T)(object)signalMessage.Message; - message1.Subscription = signalMessage.Subscription; - message1.Timetoken = signalMessage.Timetoken; - message1.UserMetadata = signalMessage.UserMetadata; - message1.Publisher = signalMessage.Publisher; - - signalAction?.Invoke(pubnub, message1); - } - - public override void ObjectEvent(Pubnub pubnub, PNObjectEventResult objectEvent) - { - objectEventAction?.Invoke(pubnub, objectEvent); - } - - public override void MessageAction(Pubnub pubnub, PNMessageActionEventResult messageActionEvent) - { - messageAction?.Invoke(pubnub, messageActionEvent); - } - - public override void File(Pubnub pubnub, PNFileEventResult fileEvent) - { - PNFileEventResult message1 = new PNFileEventResult(); - message1.Channel = fileEvent.Channel; - message1.Message = fileEvent.Message; - message1.Subscription = fileEvent.Subscription; - message1.Timetoken = fileEvent.Timetoken; - message1.Publisher = fileEvent.Publisher; - message1.File = fileEvent.File; - - fileAction?.Invoke(pubnub, message1); - } - } + public class SubscribeCallbackExt : SubscribeCallback + { + public Action> subscribeAction; + public Action presenceAction; + public Action> signalAction; + public Action statusAction; + public Action objectEventAction; + public Action messageAction; + public Action fileAction; + + public SubscribeCallbackExt() + { + subscribeAction = null; + presenceAction = null; + statusAction = null; + signalAction = null; + objectEventAction = null; + fileAction = null; + } + public SubscribeCallbackExt(Action> messageCallback, Action presenceCallback, Action statusCallback) + { + subscribeAction = messageCallback; + presenceAction = presenceCallback; + statusAction = statusCallback; + signalAction = null; + objectEventAction = null; + fileAction = null; + } + + public SubscribeCallbackExt(Action> messageCallback) + { + subscribeAction = messageCallback; + presenceAction = null; + statusAction = null; + signalAction = null; + objectEventAction = null; + fileAction = null; + } + + public SubscribeCallbackExt(Action presenceCallback) + { + subscribeAction = null; + presenceAction = presenceCallback; + statusAction = null; + signalAction = null; + objectEventAction = null; + fileAction = null; + } + + public SubscribeCallbackExt(Action> messageCallback, Action presenceCallback) + { + subscribeAction = messageCallback; + presenceAction = presenceCallback; + statusAction = null; + signalAction = null; + objectEventAction = null; + fileAction = null; + } + + public SubscribeCallbackExt(Action> signalCallback, Action statusCallback) + { + subscribeAction = null; + presenceAction = null; + statusAction = statusCallback; + signalAction = signalCallback; + objectEventAction = null; + fileAction = null; + } + + public SubscribeCallbackExt(Action objectEventCallback, Action statusCallback) + { + subscribeAction = null; + presenceAction = null; + signalAction = null; + statusAction = statusCallback; + objectEventAction = objectEventCallback; + fileAction = null; + } + + public SubscribeCallbackExt(Action messageActionCallback, Action statusCallback) + { + subscribeAction = null; + presenceAction = null; + signalAction = null; + statusAction = statusCallback; + objectEventAction = null; + messageAction = messageActionCallback; + fileAction = null; + } + + public SubscribeCallbackExt(Action fileCallback, Action statusCallback) + { + subscribeAction = null; + presenceAction = null; + statusAction = statusCallback; + signalAction = null; + objectEventAction = null; + fileAction = fileCallback; + } + + public SubscribeCallbackExt(Action> messageCallback, + Action presenceCallback, + Action> signalCallback, + Action statusCallback) + { + subscribeAction = messageCallback; + presenceAction = presenceCallback; + statusAction = statusCallback; + signalAction = signalCallback; + objectEventAction = null; + } + + public SubscribeCallbackExt(Action> messageCallback, + Action presenceCallback, + Action> signalCallback, + Action objectEventCallback, + Action statusCallback) + { + subscribeAction = messageCallback; + presenceAction = presenceCallback; + statusAction = statusCallback; + signalAction = signalCallback; + objectEventAction = objectEventCallback; + } + + public SubscribeCallbackExt(Action> messageCallback, + Action presenceCallback, + Action> signalCallback, + Action objectEventCallback, + Action messageActionCallback, + Action statusCallback) + { + subscribeAction = messageCallback; + presenceAction = presenceCallback; + statusAction = statusCallback; + signalAction = signalCallback; + objectEventAction = objectEventCallback; + messageAction = messageActionCallback; + } + + public SubscribeCallbackExt(Action> messageCallback, + Action presenceCallback, + Action> signalCallback, + Action objectEventCallback, + Action messageActionCallback, + Action fileCallback, + Action statusCallback) + { + subscribeAction = messageCallback; + presenceAction = presenceCallback; + statusAction = statusCallback; + signalAction = signalCallback; + objectEventAction = objectEventCallback; + messageAction = messageActionCallback; + fileAction = fileCallback; + } + + public override void Message(Pubnub pubnub, PNMessageResult message) + { + PNMessageResult message1 = new PNMessageResult(); + message1.Channel = message.Channel; + message1.Message = (T)(object)message.Message; + message1.Subscription = message.Subscription; + message1.Timetoken = message.Timetoken; + message1.UserMetadata = message.UserMetadata; + message1.Publisher = message.Publisher; + + subscribeAction?.Invoke(pubnub, message1); + } + + public override void Presence(Pubnub pubnub, PNPresenceEventResult presence) + { + presenceAction?.Invoke(pubnub, presence); + } + + public override void Status(Pubnub pubnub, PNStatus status) + { + statusAction?.Invoke(pubnub, status); + } + + public override void Signal(Pubnub pubnub, PNSignalResult signalMessage) + { + PNSignalResult message1 = new PNSignalResult(); + message1.Channel = signalMessage.Channel; + message1.Message = (T)(object)signalMessage.Message; + message1.Subscription = signalMessage.Subscription; + message1.Timetoken = signalMessage.Timetoken; + message1.UserMetadata = signalMessage.UserMetadata; + message1.Publisher = signalMessage.Publisher; + + signalAction?.Invoke(pubnub, message1); + } + + public override void ObjectEvent(Pubnub pubnub, PNObjectEventResult objectEvent) + { + objectEventAction?.Invoke(pubnub, objectEvent); + } + + public override void MessageAction(Pubnub pubnub, PNMessageActionEventResult messageActionEvent) + { + messageAction?.Invoke(pubnub, messageActionEvent); + } + + public override void File(Pubnub pubnub, PNFileEventResult fileEvent) + { + PNFileEventResult message1 = new PNFileEventResult(); + message1.Channel = fileEvent.Channel; + message1.Message = fileEvent.Message; + message1.Subscription = fileEvent.Subscription; + message1.Timetoken = fileEvent.Timetoken; + message1.Publisher = fileEvent.Publisher; + message1.File = fileEvent.File; + + fileAction?.Invoke(pubnub, message1); + } + } } diff --git a/src/Api/PubnubApi/Pubnub.cs b/src/Api/PubnubApi/Pubnub.cs index 23061138f..32c9f63f4 100644 --- a/src/Api/PubnubApi/Pubnub.cs +++ b/src/Api/PubnubApi/Pubnub.cs @@ -14,6 +14,7 @@ #endif using PubnubApi.Security.Crypto; using PubnubApi.Security.Crypto.Cryptors; +using PubnubApi.EventEngine.Common; namespace PubnubApi { @@ -29,6 +30,7 @@ public class Pubnub private readonly string savedSdkVerion; private SubscribeEventEngineFactory subscribeEventEngineFactory; private PresenceEventEngineFactory presenceEventengineFactory; + private EventEmitter eventEmitter; private List subscribeCallbackListenerList { get; @@ -72,6 +74,7 @@ public ISubscribeOperation Subscribe() presenceOperation = new PresenceOperation(this, InstanceId, pubnubLog, pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null ,telemetryManager, tokenManager, pubnubUnitTest ,presenceEventengineFactory); } EndPoint.SubscribeEndpoint subscribeOperation = new EndPoint.SubscribeEndpoint(pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, JsonPluggableLibrary, pubnubUnitTest, pubnubLog, null, tokenManager, this.subscribeEventEngineFactory, presenceOperation, InstanceId ,this); + subscribeOperation.EventEmitter = this.eventEmitter; subscribeOperation.SubscribeListenerList = subscribeCallbackListenerList; savedSubscribeOperation = subscribeOperation; return subscribeOperation; @@ -968,6 +971,11 @@ public void SetJsonPluggableLibrary(IJsonPluggableLibrary customJson) public string InstanceId { get; private set; } + public Channel Channel(string name) => new Channel(name, this, eventEmitter); + public ChannelGroup ChannelGroup(string name) => new ChannelGroup(name, this, eventEmitter); + + public SubscriptionSet SubscriptionSet(string[] channels, string[] channelGroups, SubscriptionOptions options) => new SubscriptionSet(channels, channelGroups, options, this, eventEmitter); + #endregion #region "Constructors" @@ -998,10 +1006,10 @@ public Pubnub(PNConfiguration config) } CheckAndInitializeEmptyStringValues(config); tokenManager = new EndPoint.TokenManager(pubnubConfig[InstanceId], JsonPluggableLibrary, pubnubLog, this.InstanceId); - + //Initialize JsonPluggableLibrary JsonPluggableLibrary = new NewtonsoftJsonDotNet(config, pubnubLog); - + //Check PresenceTimeout if (config.PresenceTimeout < 20) { @@ -1014,7 +1022,7 @@ public Pubnub(PNConfiguration config) //Check required UserId CheckRequiredUserId(config); - + eventEmitter = new EventEmitter(pubnubConfig.ContainsKey(InstanceId) ? pubnubConfig[InstanceId] : null, subscribeCallbackListenerList, JsonPluggableLibrary, tokenManager, pubnubLog, this); //Check CryptoModule usage CheckCryptoModuleUsageForLogging(config); diff --git a/src/Api/PubnubApiPCL/PubnubApiPCL.csproj b/src/Api/PubnubApiPCL/PubnubApiPCL.csproj index d069f219b..6f20a08ed 100644 --- a/src/Api/PubnubApiPCL/PubnubApiPCL.csproj +++ b/src/Api/PubnubApiPCL/PubnubApiPCL.csproj @@ -277,6 +277,7 @@ + HttpUtility\HttpUtility.cs diff --git a/src/Api/PubnubApiUnity/PubnubApiUnity.csproj b/src/Api/PubnubApiUnity/PubnubApiUnity.csproj index 350460416..dda603755 100644 --- a/src/Api/PubnubApiUnity/PubnubApiUnity.csproj +++ b/src/Api/PubnubApiUnity/PubnubApiUnity.csproj @@ -213,6 +213,12 @@ + + + + + + EndPoint\PubSub\UnsubscribeAllOperation.cs