Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Dec 7, 2023
1 parent 56ea7fe commit e332770
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,12 @@ public class ActiveMQConnectorConfig implements Serializable {

private String activeMessageType = ActiveMQTextMessage.class.getSimpleName();

public static ActiveMQConnectorConfig load(Map<String, Object> map, SinkContext sinkContext,
SourceContext sourceContext) throws IOException {
if (sinkContext != null) {
return IOConfigUtils.loadWithSecrets(map, ActiveMQConnectorConfig.class, sinkContext);
} else if (sourceContext != null) {
return IOConfigUtils.loadWithSecrets(map, ActiveMQConnectorConfig.class, sourceContext);
} else {
throw new IllegalArgumentException("Either SinkContext or SourceContext must be set.");
}
public static ActiveMQConnectorConfig load(Map<String, Object> map, SourceContext sourceContext) throws IOException {
return IOConfigUtils.loadWithSecrets(map, ActiveMQConnectorConfig.class, sourceContext);
}

public static ActiveMQConnectorConfig load(Map<String, Object> map, SinkContext sinkContext) throws IOException {
return IOConfigUtils.loadWithSecrets(map, ActiveMQConnectorConfig.class, sinkContext);
}

public void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void open(Map<String, Object> map, SinkContext sinkContext) throws Except
throw new IllegalStateException("Connector is already open");
}

config = ActiveMQConnectorConfig.load(map, sinkContext, null);
config = ActiveMQConnectorConfig.load(map, sinkContext);
config.validate();

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getBrokerUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void open(Map<String, Object> map, SourceContext sourceContext) throws Ex
throw new IllegalStateException("Connector is already open");
}

config = ActiveMQConnectorConfig.load(map, null, sourceContext);
config = ActiveMQConnectorConfig.load(map, sourceContext);
config.validate();

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getBrokerUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ConnectorConfigTest extends ActiveMQConnectorTestBase {
@Test
public void loadBasicConfigTest() throws IOException {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext, null);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("tcp", activeMQConnectorConfig.getProtocol());
Expand All @@ -53,7 +53,7 @@ public void loadBasicConfigAndCredentialFromSecretTest() throws IOException {
.thenReturn("guest");
Mockito.when(sinkContext.getSecret("password"))
.thenReturn("guest");
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext, null);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("tcp", activeMQConnectorConfig.getProtocol());
Expand All @@ -72,7 +72,7 @@ public void loadBasicConfigAndCredentialFromSecretForSourceTest() throws IOExcep
Mockito.when(sourceContext.getSecret("password"))
.thenReturn("guest");
ActiveMQConnectorConfig activeMQConnectorConfig =
ActiveMQConnectorConfig.load(queueConfig, null, sourceContext);
ActiveMQConnectorConfig.load(queueConfig, sourceContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("tcp", activeMQConnectorConfig.getProtocol());
Expand All @@ -86,7 +86,7 @@ public void loadBasicConfigAndCredentialFromSecretForSourceTest() throws IOExcep
@Test
public void loadQueueConfigTest() throws IOException {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext, null);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("test-queue", activeMQConnectorConfig.getQueueName());
Expand All @@ -96,7 +96,7 @@ public void loadQueueConfigTest() throws IOException {
@Test
public void loadTopicConfigTest() throws IOException {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext, null);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("test-topic", activeMQConnectorConfig.getTopicName());
Expand All @@ -106,13 +106,13 @@ public void loadTopicConfigTest() throws IOException {
@Test
public void loadMessageTypeConfig() throws IOException {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext, null);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals(ActiveMQTextMessage.class.getSimpleName(), activeMQConnectorConfig.getActiveMessageType());

topicConfig.put("activeMessageType", ActiveMQBytesMessage.class.getSimpleName());
activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext, null);
activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals(ActiveMQBytesMessage.class.getSimpleName(), activeMQConnectorConfig.getActiveMessageType());
Expand Down

0 comments on commit e332770

Please sign in to comment.