Skip to content

Commit

Permalink
added DefaultSolacePersistentMessageHandlerObserver to avoid redundan…
Browse files Browse the repository at this point in the history
…t null checks
  • Loading branch information
mynecker committed Oct 23, 2024
1 parent c6e9865 commit 5fec064
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.solace.maas.ep.event.management.agent.subscriber;

import com.solace.messaging.receiver.InboundMessage;

public class DefaultSolacePersistentMessageHandlerObserver implements SolacePersistentMessageHandlerObserver {

@Override
public void onMessageReceived(InboundMessage message) {
// Do nothing
}

@Override
public void onMessageProcessingInitiated(InboundMessage message) {
// Do nothing
}

@Override
public void onMessageProcessingCompleted(InboundMessage message) {
// Do nothing
}

@Override
public void onMessageProcessingAcknowledged(InboundMessage message) {
// Do nothing
}

@Override
public void onMessageProcessingFailed(InboundMessage message) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp
private PersistentMessageReceiver persistentMessageReceiver;

// only used for testing
@Setter
private SolacePersistentMessageHandlerObserver messageHandlerObserver;

protected SolacePersistentMessageHandler(MessagingService messagingService,
EventPortalProperties eventPortalProperties,
List<MessageProcessor> messageProcessorList) {

super();
this.messageHandlerObserver = new DefaultSolacePersistentMessageHandlerObserver();
this.messagingService = messagingService;
this.eventPortalProperties = eventPortalProperties;
messageProcessorsByClassType = messageProcessorList.stream()
Expand All @@ -63,19 +63,22 @@ protected SolacePersistentMessageHandler(MessagingService messagingService,
}


public void setMessageHandlerObserver(SolacePersistentMessageHandlerObserver observer) {
this.messageHandlerObserver = observer;
if (this.messageHandlerObserver==null){
this.messageHandlerObserver = new DefaultSolacePersistentMessageHandlerObserver();
}
}

@Override
public void onMessage(InboundMessage inboundMessage) {
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageReceived(inboundMessage);
}
messageHandlerObserver.onMessageReceived(inboundMessage);
executor.submit(() -> processMessage(inboundMessage));
}


private void processMessage(InboundMessage inboundMessage) {
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingInitiated(inboundMessage);
}
messageHandlerObserver.onMessageProcessingInitiated(inboundMessage);
String mopMessageSubclass = "";
MessageProcessor processor = null;
Object message = null;
Expand All @@ -91,19 +94,13 @@ private void processMessage(InboundMessage inboundMessage) {
log.trace("onMessage: {}\n{}", messageClass, messageAsString);
message = toMessage(messageAsString, messageClass);
processor.processMessage(processor.castToMessageClass(message));
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingCompleted(inboundMessage);
}
messageHandlerObserver.onMessageProcessingCompleted(inboundMessage);
} catch (Exception e) {
handleProcessingError(mopMessageSubclass, processor, message, e);
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingFailed(inboundMessage);
}
messageHandlerObserver.onMessageProcessingFailed(inboundMessage);
} finally {
acknowledgeMessage(inboundMessage);
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingAcknowledged(inboundMessage);
}
messageHandlerObserver.onMessageProcessingAcknowledged(inboundMessage);
}
}

Expand Down

0 comments on commit 5fec064

Please sign in to comment.