From b40f24797376abf584d7915a9ede2187beb5a13f Mon Sep 17 00:00:00 2001 From: Denis Shvedchenko Date: Fri, 29 Apr 2016 17:28:33 +0300 Subject: [PATCH 1/5] issue31: proposed fix --- .../instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java index e85f8f1..eab3ebb 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java @@ -79,7 +79,7 @@ protected void logUndefinedRow(final Object[] r, final String log, final String public void shutdown() throws IOException { if (channel.isOpen()) { - channel.close(); + channel.getConnection().close(); } } From 58181ceaf11e2e2ee0eb1c05dd603ba530868b11 Mon Sep 17 00:00:00 2001 From: Denis Shvedchenko Date: Fri, 29 Apr 2016 17:33:31 +0300 Subject: [PATCH 2/5] issue31: proposed fix, tuned test. need suggestion how to reach getConnection().close() with Mockito --- .../pentaho/plugin/amqp/processor/AbstractProcessorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java b/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java index fc37e9b..0de1b0a 100644 --- a/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java +++ b/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java @@ -121,7 +121,7 @@ public void testShutdownClosesChannel() throws IOException when(channel.isOpen()).thenReturn(true); processor.shutdown(); - verify(channel, times(1)).close(); + verify(channel, times(1)).getConnection(); } @Test From 754323cefc06f33e913f890fc5468bd8d9af756a Mon Sep 17 00:00:00 2001 From: Denis Shvedchenko Date: Fri, 29 Apr 2016 18:40:55 +0300 Subject: [PATCH 3/5] issue31: add mock for connection close() --- .gitignore | 2 ++ .../amqp/processor/AbstractProcessorTest.java | 20 +++++++++++++------ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 2f7896d..4ed60aa 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ target/ +.idea/ +ic-amqp-plugin.iml diff --git a/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java b/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java index 0de1b0a..4be80c6 100644 --- a/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java +++ b/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java @@ -1,30 +1,36 @@ package com.instaclick.pentaho.plugin.amqp.processor; -import com.instaclick.pentaho.plugin.amqp.initializer.Initializer; import com.instaclick.pentaho.plugin.amqp.AMQPPlugin; import com.instaclick.pentaho.plugin.amqp.AMQPPluginData; +import com.instaclick.pentaho.plugin.amqp.initializer.Initializer; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.junit.Before; +import org.junit.Test; +import org.pentaho.di.core.exception.KettleStepException; +import org.pentaho.di.core.row.RowMetaInterface; + import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.junit.Test; -import static org.junit.Assert.*; -import org.junit.Before; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.*; -import org.pentaho.di.core.exception.KettleStepException; -import org.pentaho.di.core.row.RowMetaInterface; public class AbstractProcessorTest { AMQPPluginData data; AMQPPlugin plugin; Channel channel; + Connection connection; @Before public void setUp() { channel = mock(Channel.class, RETURNS_MOCKS); + connection = mock(Connection.class, RETURNS_MOCKS); data = mock(AMQPPluginData.class); plugin = mock(AMQPPlugin.class); } @@ -120,8 +126,10 @@ public void testShutdownClosesChannel() throws IOException final BaseProcessor processor = new AbstractProcessorImpl(channel, plugin, data); when(channel.isOpen()).thenReturn(true); + when(channel.getConnection()).thenReturn(connection); processor.shutdown(); verify(channel, times(1)).getConnection(); + verify(connection, times(1)).close(); } @Test From 51b81db40c0efb9a3294fb9cea29c65edac4d202 Mon Sep 17 00:00:00 2001 From: Denis Shvedchenko Date: Sat, 30 Apr 2016 00:45:22 +0300 Subject: [PATCH 4/5] issue31: check that connection is opened before attempt --- .../plugin/amqp/processor/BaseProcessor.java | 15 ++++++++++++--- .../amqp/processor/AbstractProcessorTest.java | 1 + 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java index eab3ebb..1a11fc2 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/processor/BaseProcessor.java @@ -1,12 +1,14 @@ package com.instaclick.pentaho.plugin.amqp.processor; -import com.instaclick.pentaho.plugin.amqp.initializer.Initializer; import com.instaclick.pentaho.plugin.amqp.AMQPPlugin; import com.instaclick.pentaho.plugin.amqp.AMQPPluginData; +import com.instaclick.pentaho.plugin.amqp.initializer.Initializer; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.pentaho.di.core.exception.KettleStepException; + import java.io.IOException; import java.util.List; -import org.pentaho.di.core.exception.KettleStepException; abstract class BaseProcessor implements Processor { @@ -79,7 +81,14 @@ protected void logUndefinedRow(final Object[] r, final String log, final String public void shutdown() throws IOException { if (channel.isOpen()) { - channel.getConnection().close(); + channel.close(); + } + + if (channel != null) { + Connection connection = channel.getConnection(); + if (connection.isOpen()) { + connection.close(); + } } } diff --git a/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java b/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java index 4be80c6..50de03d 100644 --- a/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java +++ b/src/test/java/com/instaclick/pentaho/plugin/amqp/processor/AbstractProcessorTest.java @@ -127,6 +127,7 @@ public void testShutdownClosesChannel() throws IOException when(channel.isOpen()).thenReturn(true); when(channel.getConnection()).thenReturn(connection); + when(connection.isOpen()).thenReturn(true); processor.shutdown(); verify(channel, times(1)).getConnection(); verify(connection, times(1)).close(); From 8ea7380c14d1b207ae034d57cb5729d214a22b16 Mon Sep 17 00:00:00 2001 From: Denis Shvedchenko Date: Sat, 30 Apr 2016 00:52:38 +0300 Subject: [PATCH 5/5] issue31: alos not use port value from field if it is null --- .../pentaho/plugin/amqp/AMQPPlugin.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java index 0f9fda6..2713227 100644 --- a/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java +++ b/src/main/java/com/instaclick/pentaho/plugin/amqp/AMQPPlugin.java @@ -1,23 +1,21 @@ package com.instaclick.pentaho.plugin.amqp; -import java.io.IOException; +import com.instaclick.pentaho.plugin.amqp.processor.Processor; +import com.instaclick.pentaho.plugin.amqp.processor.ProcessorFactory; +import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleStepException; +import org.pentaho.di.core.row.RowMeta; +import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.trans.Trans; +import org.pentaho.di.trans.TransListener; import org.pentaho.di.trans.TransMeta; -import org.pentaho.di.trans.step.BaseStep; -import org.pentaho.di.trans.step.StepDataInterface; -import org.pentaho.di.trans.step.StepInterface; -import org.pentaho.di.trans.step.StepMeta; -import org.pentaho.di.trans.step.StepMetaInterface; +import org.pentaho.di.trans.step.*; + +import java.io.IOException; + import static com.instaclick.pentaho.plugin.amqp.Messages.getString; -import com.instaclick.pentaho.plugin.amqp.processor.Processor; -import com.instaclick.pentaho.plugin.amqp.processor.ProcessorFactory; import static org.pentaho.di.core.encryption.KettleTwoWayPasswordEncoder.decryptPasswordOptionallyEncrypted; -import org.pentaho.di.core.Const; -import org.pentaho.di.core.row.RowMeta; -import org.pentaho.di.trans.TransListener; -import org.pentaho.di.core.row.RowMetaInterface; public class AMQPPlugin extends BaseStep implements StepInterface { @@ -171,7 +169,9 @@ private void initPluginStep() throws Exception data.uri = uri; data.host = host; - data.port = port; + if (port != null) { + data.port = port; + } data.vhost = vhost; data.username = username; data.password = password;