Skip to content

Commit

Permalink
set EnableEventEngine and fix relevant issues
Browse files Browse the repository at this point in the history
  • Loading branch information
mohitpubnub committed Jun 10, 2024
1 parent 4c5ae5d commit 045ba56
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 13 deletions.
36 changes: 27 additions & 9 deletions src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using PubnubApi.EventEngine.Subscribe.Common;
using PubnubApi.Security.Crypto;
using PubnubApi.Security.Crypto.Cryptors;
using Newtonsoft.Json.Linq;

namespace PubnubApi.EventEngine.Common
{
Expand Down Expand Up @@ -95,14 +96,21 @@ public void EmitEvent<T>(object e)
if (currentMessageChannel.Replace("-pnpres", "") == currentMessageChannelGroup?.Replace("-pnpres", "")) {
currentMessageChannelGroup = "";
}
object payload = eventData.Payload;
object payload;
string payloadAsString = eventData.Payload as string;
if (payloadAsString != null) {
var jsonObject = jsonLibrary.BuildJsonObject(payloadAsString.ToString());
payload = jsonObject ?? payloadAsString;
} else {
payload = eventData.Payload;
}
List<object> payloadContainer = new List<object>(); //First item always message
if (currentMessageChannel.Contains("-pnpres") || currentMessageChannel.Contains(".*-pnpres")) {
payloadContainer.Add(payload);
} else if (eventData.MessageType == 2) //Objects Simplification events
{
double objectsVersion = -1;
Dictionary<string, object> objectsDic = payload as Dictionary<string, object>;
Dictionary<string, object> objectsDic = payload as Dictionary<string, object> ?? (payload as JObject).ToObject<Dictionary<string, object>>();
if (objectsDic != null
&& objectsDic.ContainsKey("source")
&& objectsDic.ContainsKey("version")
Expand All @@ -114,10 +122,10 @@ public void EmitEvent<T>(object e)
}
}
} else {
if ((configuration.CryptoModule != null || configuration.CipherKey.Length > 0) && eventData.MessageType != 1) //decrypt the subscriber message if cipherkey is available
if ((configuration.CryptoModule != null || configuration.CipherKey.Length > 0) && (eventData.MessageType == 0 || eventData.MessageType == 4)) //decrypt the subscriber message if cipherkey is available
{
string decryptMessage = "";
configuration.CryptoModule ??= new CryptoModule(new LegacyCryptor(configuration.CipherKey, configuration.UseRandomInitializationVector, log), null);
configuration.CryptoModule ??= new CryptoModule(new AesCbcCryptor(configuration.CipherKey, log), new List<ICryptor>() { new LegacyCryptor(configuration.CipherKey, configuration.UseRandomInitializationVector) });
try {
decryptMessage = configuration.CryptoModule.Decrypt(payload.ToString());
} catch (Exception ex) {
Expand Down Expand Up @@ -150,7 +158,7 @@ public void EmitEvent<T>(object e)

payloadContainer.Add(userMetaData); //Second one always user meta data

payloadContainer.Add(GetTimetokenMetadata(eventData.PublishMetadata)); //Third one always Timetoken
payloadContainer.Add(GetTimetokenMetadata(eventData.PublishMetadata).Timetoken); //Third one always Timetoken

payloadContainer.Add(eventData.IssuingClientId); //Fourth one always Publisher

Expand Down Expand Up @@ -183,7 +191,7 @@ public void EmitEvent<T>(object e)
l?.Signal(instance, signalMessage);
}
}
if (!string.IsNullOrEmpty(signalMessage.Subscription) && channelListenersMap.ContainsKey(signalMessage.Subscription)) {
if (!string.IsNullOrEmpty(signalMessage.Subscription) && channelGroupListenersMap.ContainsKey(signalMessage.Subscription)) {
foreach (var l in channelGroupListenersMap[signalMessage.Subscription]) {
l?.Signal(instance, signalMessage);
}
Expand All @@ -201,6 +209,11 @@ public void EmitEvent<T>(object e)
l?.ObjectEvent(instance, objectApiEvent);
}
}
if (!string.IsNullOrEmpty(objectApiEvent.Subscription) && channelGroupListenersMap.ContainsKey(objectApiEvent.Subscription)) {
foreach (var l in channelGroupListenersMap[objectApiEvent.Subscription]) {
l?.ObjectEvent(instance, objectApiEvent);
}
}
}
} else if (eventData.MessageType == 3) {
ResponseBuilder responseBuilder = new ResponseBuilder(configuration, jsonLibrary, log);
Expand All @@ -214,6 +227,11 @@ public void EmitEvent<T>(object e)
l?.MessageAction(instance, messageActionEvent);
}
}
if (!string.IsNullOrEmpty(messageActionEvent.Subscription) && channelGroupListenersMap.ContainsKey(messageActionEvent.Subscription)) {
foreach (var l in channelGroupListenersMap[messageActionEvent.Subscription]) {
l?.MessageAction(instance, messageActionEvent);
}
}
}
} else if (eventData.MessageType == 4) {
ResponseBuilder responseBuilder = new ResponseBuilder(configuration, jsonLibrary, log);
Expand Down Expand Up @@ -253,7 +271,7 @@ public void EmitEvent<T>(object e)
l?.File(instance, fileMessage);
}
}
if (!string.IsNullOrEmpty(fileMessage.Subscription) && channelListenersMap.ContainsKey(fileMessage.Subscription)) {
if (!string.IsNullOrEmpty(fileMessage.Subscription) && channelGroupListenersMap.ContainsKey(fileMessage.Subscription)) {
foreach (var l in channelGroupListenersMap[fileMessage.Subscription]) {
l?.File(instance, fileMessage);
}
Expand All @@ -271,7 +289,7 @@ public void EmitEvent<T>(object e)
l?.Presence(instance, presenceEvent);
}
}
if (!string.IsNullOrEmpty(presenceEvent.Subscription) && channelListenersMap.ContainsKey(presenceEvent.Subscription)) {
if (!string.IsNullOrEmpty(presenceEvent.Subscription) && channelGroupListenersMap.ContainsKey(presenceEvent.Subscription)) {
foreach (var l in channelGroupListenersMap[presenceEvent.Subscription]) {
l?.Presence(instance, presenceEvent);
}
Expand All @@ -290,7 +308,7 @@ public void EmitEvent<T>(object e)
l?.Message(instance, message);
}
}
if (!string.IsNullOrEmpty(message.Subscription) && channelListenersMap.ContainsKey(message.Subscription)) {
if (!string.IsNullOrEmpty(message.Subscription) && channelGroupListenersMap.ContainsKey(message.Subscription)) {
foreach (var l in channelGroupListenersMap[message.Subscription]) {
l?.Message(instance, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ internal static PNMessageActionEventResult GetObject(IJsonPluggableLibrary jsonP
}
}
result.Uuid = listObject[3].ToString();
result.Channel = listObject[4].ToString();
if (listObject.Count == 6) {
result.Subscription = listObject[4].ToString();
result.Channel = listObject[5].ToString();
} else if (listObject.Count == 5) {
result.Channel = listObject[4].ToString();
}

}

return result;
Expand Down
8 changes: 6 additions & 2 deletions src/Api/PubnubApi/JsonDataParse/PNObjectEventJsonDataParse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ internal static PNObjectEventResult GetObject(IJsonPluggableLibrary jsonPlug, Li
}
}
}

result.Channel = (listObject.Count == 6) ? listObject[5].ToString() : listObject[4].ToString();
if (listObject.Count == 6) {
result.Subscription = listObject[4].ToString();
result.Channel = listObject[5].ToString();
} else if (listObject.Count == 5) {
result.Channel = listObject[4].ToString();
}
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ public class PNMessageActionEventResult
public PNMessageAction Action { get; internal set; }
public string Uuid { get; internal set; }
public string Channel { get; internal set; }
public string Subscription { get; internal set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public PNObjectEventResult()
public PNChannelMetadataResult ChannelMetadata { get; internal set; } //Populate when Type = channel
public long Timestamp { get; internal set; }
public string Channel { get; internal set; } //Subscribed channel
public string Subscription { get; internal set; }
}
}
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/PNConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public PNReconnectionPolicy ReconnectionPolicy {

public bool MaintainPresenceState { get; set; } = true;

public bool EnableEventEngine { get; set; }
public bool EnableEventEngine { get; set; } = true;

public int FileMessagePublishRetryLimit { get; set; }

Expand Down

0 comments on commit 045ba56

Please sign in to comment.