diff --git a/src/main/java/org/monora/coolsocket/core/session/ActiveConnection.java b/src/main/java/org/monora/coolsocket/core/session/ActiveConnection.java index 27d7183..d83ad06 100644 --- a/src/main/java/org/monora/coolsocket/core/session/ActiveConnection.java +++ b/src/main/java/org/monora/coolsocket/core/session/ActiveConnection.java @@ -17,7 +17,6 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; -import java.util.logging.Logger; import static org.monora.coolsocket.core.config.Config.*; @@ -43,6 +42,8 @@ public class ActiveConnection implements Closeable private boolean closeRequested; + private boolean multichannel = false; + private boolean roaming = false; /** @@ -353,7 +354,7 @@ public void handleProtocolRequest(@NotNull Description description, boolean writ protocolRequest = ProtocolRequest.Close; else if (cancelled()) protocolRequest = ProtocolRequest.Cancel; - else if (protocolVersion == 0) { + else if (protocolVersion == 0 && !multichannel) { protocolRequest = ProtocolRequest.InfoExchange; exchange = InfoExchange.ProtocolVersion; } else @@ -397,6 +398,37 @@ else if (protocolVersion == 0) { handleProtocolRequest(description, write); } + /** + * Check whether the multichannel mode is enabled. + *
+ * In this mode, the write and read streams are separated allowing asynchronous data transaction to be possible. + *
+ * When enabled, the inverse exchange point is not handled, only the readers is able {@link #readState(Description)}, + * and only writers is able to {@link #writeState(Description)}. That way two write and read channels can be used + * independently of each other. + * + * @return True if the multichannel mode is enabled. + * @see #setMultichannel(boolean) + */ + public boolean isMultichannel() + { + return multichannel; + } + + /** + * Enable/disable the multichannel mode. + *
+ * Changing the mode does not send a signal to the remote host, and both side has to enable/disable it at the same + * time. + * + * @param multichannel True to enable the multichannel mode. + * @see #isMultichannel() + */ + public void setMultichannel(boolean multichannel) + { + this.multichannel = multichannel; + } + /** * Check whether the roaming state is activated for this instance. *
@@ -440,11 +472,12 @@ public int read(@NotNull Description description) throws IOException boolean chunked = description.flags.chunked(); if (description.nextAvailable <= 0) { - if (description.transactionCount++ == description.inverseExchangePoint) { + if (!multichannel && description.transactionCount++ == description.inverseExchangePoint) { writeState(description); description.transactionCount = 0; - } else + } else { readState(description); + } readOrFail(description.byteBuffer, Long.BYTES); description.nextAvailable = description.byteBuffer.getLong(); @@ -510,8 +543,12 @@ public int read(@NotNull Description description) throws IOException { ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); - byteBuffer.putInt(inverseExchangePoint).flip(); - getWritableByteChannel().write(byteBuffer); + if (multichannel) { + inverseExchangePoint = DEFAULT_INVERSE_EXCHANGE_POINT; + } else { + byteBuffer.putInt(inverseExchangePoint).flip(); + getWritableByteChannel().write(byteBuffer); + } readOrFail(byteBuffer, Long.BYTES * 2 + Integer.BYTES); @@ -521,7 +558,7 @@ public int read(@NotNull Description description) throws IOException nextOperationId = description.operationId; readState(description); - writeState(description); + if (!multichannel) writeState(description); return description; } @@ -847,7 +884,7 @@ public synchronized void write(@NotNull Description description, byte @NotNull [ description.available(), consume); } - if (description.transactionCount++ == description.inverseExchangePoint) { + if (!multichannel && description.transactionCount++ == description.inverseExchangePoint) { readState(description); description.transactionCount = 0; } else @@ -915,9 +952,15 @@ public synchronized void write(@NotNull Description description, @NotNull InputS ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); int operationId = ++nextOperationId; - readOrFail(byteBuffer, Integer.BYTES); - int inverseExchangePoint = byteBuffer.getInt(); - byteBuffer.clear(); + int inverseExchangePoint; + + if (multichannel) { + inverseExchangePoint = DEFAULT_INVERSE_EXCHANGE_POINT; + } else { + readOrFail(byteBuffer, Integer.BYTES); + inverseExchangePoint = byteBuffer.getInt(); + byteBuffer.clear(); + } Description description = new Description(flags, operationId, totalLength, inverseExchangePoint, byteBuffer); byteBuffer.putLong(flags) @@ -929,7 +972,7 @@ public synchronized void write(@NotNull Description description, @NotNull InputS byteBuffer.clear(); writeState(description); - readState(description); + if (!multichannel) readState(description); return description; } diff --git a/src/test/java/org/monora/coolsocket/core/CommandExecutionTest.java b/src/test/java/org/monora/coolsocket/core/CommandExecutionTest.java index 7a78eea..25caf41 100644 --- a/src/test/java/org/monora/coolsocket/core/CommandExecutionTest.java +++ b/src/test/java/org/monora/coolsocket/core/CommandExecutionTest.java @@ -4,6 +4,7 @@ import org.junit.Assert; import org.junit.Test; import org.monora.coolsocket.core.config.Config; +import org.monora.coolsocket.core.response.Response; import org.monora.coolsocket.core.session.ActiveConnection; import org.monora.coolsocket.core.session.CancelledException; import org.monora.coolsocket.core.session.ClosedException; @@ -310,4 +311,59 @@ public void onConnected(@NotNull ActiveConnection activeConnection) coolSocket.stop(); } } + + @Test + public void readerCancellingIgnoredWhenMultichannelEnabled() throws InterruptedException, IOException + { + CoolSocket coolSocket = new DefaultCoolSocket() + { + @Override + public void onConnected(@NotNull ActiveConnection activeConnection) + { + activeConnection.setMultichannel(true); + try { + activeConnection.cancel(); + activeConnection.receive(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + + coolSocket.start(); + + try (ActiveConnection activeConnection = Connections.connect()) { + activeConnection.setMultichannel(true); + activeConnection.reply("hello"); + } finally { + coolSocket.stop(); + } + } + + @Test(expected = CancelledException.class) + public void throwsAfterSenderCancelsWhenMultichannelEnabled() throws InterruptedException, IOException + { + CoolSocket coolSocket = new DefaultCoolSocket() + { + @Override + public void onConnected(@NotNull ActiveConnection activeConnection) + { + activeConnection.setMultichannel(true); + try { + activeConnection.receive(); + } catch (IOException ignored) { + } + } + }; + + coolSocket.start(); + + try (ActiveConnection activeConnection = Connections.connect()) { + activeConnection.cancel(); + activeConnection.setMultichannel(true); + activeConnection.reply("hello"); + } finally { + coolSocket.stop(); + } + } } diff --git a/src/test/java/org/monora/coolsocket/core/PlainTransactionTest.java b/src/test/java/org/monora/coolsocket/core/PlainTransactionTest.java index b4915d9..67ec15d 100644 --- a/src/test/java/org/monora/coolsocket/core/PlainTransactionTest.java +++ b/src/test/java/org/monora/coolsocket/core/PlainTransactionTest.java @@ -6,6 +6,7 @@ import org.junit.Test; import org.monora.coolsocket.core.response.Response; import org.monora.coolsocket.core.session.ActiveConnection; +import org.monora.coolsocket.core.session.CancelledException; import org.monora.coolsocket.core.variant.*; import java.io.IOException; @@ -232,4 +233,37 @@ public void nonRoamingChildClosesTest() throws IOException, InterruptedException coolSocket.stop(); } } + + @Test + public void handlesDisorderlyTransactionsWhenMultichannelEnabled() throws IOException, InterruptedException + { + String message = "Hello, World!"; + CoolSocket coolSocket = new DefaultCoolSocket() + { + @Override + public void onConnected(@NotNull ActiveConnection activeConnection) + { + activeConnection.setMultichannel(true); + try { + // Put the read and write in the same order on both the reader and the sender. + activeConnection.reply(message); + activeConnection.receive(); + } catch (IOException ignored) { + } + } + }; + + coolSocket.start(); + + try (ActiveConnection activeConnection = Connections.connect()) { + activeConnection.setMultichannel(true); + + activeConnection.reply(message); + Response response = activeConnection.receive(); + + Assert.assertEquals("The message should match after the disorderly transaction", message, response.getAsString()); + } finally { + coolSocket.stop(); + } + } }