Skip to content

Commit

Permalink
feat(multichannel): Implement asynchronous read/write on the same con…
Browse files Browse the repository at this point in the history
…nection
  • Loading branch information
velitasali committed Sep 11, 2023
1 parent 4d1c056 commit 2400a62
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.logging.Logger;

import static org.monora.coolsocket.core.config.Config.*;

Expand All @@ -43,6 +42,8 @@ public class ActiveConnection implements Closeable

private boolean closeRequested;

private boolean multichannel = false;

private boolean roaming = false;

/**
Expand Down Expand Up @@ -353,7 +354,7 @@ public void handleProtocolRequest(@NotNull Description description, boolean writ
protocolRequest = ProtocolRequest.Close;
else if (cancelled())
protocolRequest = ProtocolRequest.Cancel;
else if (protocolVersion == 0) {
else if (protocolVersion == 0 && !multichannel) {
protocolRequest = ProtocolRequest.InfoExchange;
exchange = InfoExchange.ProtocolVersion;
} else
Expand Down Expand Up @@ -397,6 +398,37 @@ else if (protocolVersion == 0) {
handleProtocolRequest(description, write);
}

/**
* Check whether the multichannel mode is enabled.
* <p>
* In this mode, the write and read streams are separated allowing asynchronous data transaction to be possible.
* <p>
* When enabled, the inverse exchange point is not handled, only the readers is able {@link #readState(Description)},
* and only writers is able to {@link #writeState(Description)}. That way two write and read channels can be used
* independently of each other.
*
* @return True if the multichannel mode is enabled.
* @see #setMultichannel(boolean)
*/
public boolean isMultichannel()
{
return multichannel;
}

/**
* Enable/disable the multichannel mode.
* <p>
* Changing the mode does not send a signal to the remote host, and both side has to enable/disable it at the same
* time.
*
* @param multichannel True to enable the multichannel mode.
* @see #isMultichannel()
*/
public void setMultichannel(boolean multichannel)
{
this.multichannel = multichannel;
}

/**
* Check whether the roaming state is activated for this instance.
* <p>
Expand Down Expand Up @@ -440,11 +472,12 @@ public int read(@NotNull Description description) throws IOException
boolean chunked = description.flags.chunked();

if (description.nextAvailable <= 0) {
if (description.transactionCount++ == description.inverseExchangePoint) {
if (!multichannel && description.transactionCount++ == description.inverseExchangePoint) {
writeState(description);
description.transactionCount = 0;
} else
} else {
readState(description);
}
readOrFail(description.byteBuffer, Long.BYTES);
description.nextAvailable = description.byteBuffer.getLong();

Expand Down Expand Up @@ -510,8 +543,12 @@ public int read(@NotNull Description description) throws IOException
{
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);

byteBuffer.putInt(inverseExchangePoint).flip();
getWritableByteChannel().write(byteBuffer);
if (multichannel) {
inverseExchangePoint = DEFAULT_INVERSE_EXCHANGE_POINT;
} else {
byteBuffer.putInt(inverseExchangePoint).flip();
getWritableByteChannel().write(byteBuffer);
}

readOrFail(byteBuffer, Long.BYTES * 2 + Integer.BYTES);

Expand All @@ -521,7 +558,7 @@ public int read(@NotNull Description description) throws IOException
nextOperationId = description.operationId;

readState(description);
writeState(description);
if (!multichannel) writeState(description);

return description;
}
Expand Down Expand Up @@ -847,7 +884,7 @@ public synchronized void write(@NotNull Description description, byte @NotNull [
description.available(), consume);
}

if (description.transactionCount++ == description.inverseExchangePoint) {
if (!multichannel && description.transactionCount++ == description.inverseExchangePoint) {
readState(description);
description.transactionCount = 0;
} else
Expand Down Expand Up @@ -915,9 +952,15 @@ public synchronized void write(@NotNull Description description, @NotNull InputS
ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
int operationId = ++nextOperationId;

readOrFail(byteBuffer, Integer.BYTES);
int inverseExchangePoint = byteBuffer.getInt();
byteBuffer.clear();
int inverseExchangePoint;

if (multichannel) {
inverseExchangePoint = DEFAULT_INVERSE_EXCHANGE_POINT;
} else {
readOrFail(byteBuffer, Integer.BYTES);
inverseExchangePoint = byteBuffer.getInt();
byteBuffer.clear();
}

Description description = new Description(flags, operationId, totalLength, inverseExchangePoint, byteBuffer);
byteBuffer.putLong(flags)
Expand All @@ -929,7 +972,7 @@ public synchronized void write(@NotNull Description description, @NotNull InputS
byteBuffer.clear();

writeState(description);
readState(description);
if (!multichannel) readState(description);

return description;
}
Expand Down
56 changes: 56 additions & 0 deletions src/test/java/org/monora/coolsocket/core/CommandExecutionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.junit.Assert;
import org.junit.Test;
import org.monora.coolsocket.core.config.Config;
import org.monora.coolsocket.core.response.Response;
import org.monora.coolsocket.core.session.ActiveConnection;
import org.monora.coolsocket.core.session.CancelledException;
import org.monora.coolsocket.core.session.ClosedException;
Expand Down Expand Up @@ -310,4 +311,59 @@ public void onConnected(@NotNull ActiveConnection activeConnection)
coolSocket.stop();
}
}

@Test
public void readerCancellingIgnoredWhenMultichannelEnabled() throws InterruptedException, IOException
{
CoolSocket coolSocket = new DefaultCoolSocket()
{
@Override
public void onConnected(@NotNull ActiveConnection activeConnection)
{
activeConnection.setMultichannel(true);
try {
activeConnection.cancel();
activeConnection.receive();
} catch (IOException e) {
e.printStackTrace();
}
}
};

coolSocket.start();

try (ActiveConnection activeConnection = Connections.connect()) {
activeConnection.setMultichannel(true);
activeConnection.reply("hello");
} finally {
coolSocket.stop();
}
}

@Test(expected = CancelledException.class)
public void throwsAfterSenderCancelsWhenMultichannelEnabled() throws InterruptedException, IOException
{
CoolSocket coolSocket = new DefaultCoolSocket()
{
@Override
public void onConnected(@NotNull ActiveConnection activeConnection)
{
activeConnection.setMultichannel(true);
try {
activeConnection.receive();
} catch (IOException ignored) {
}
}
};

coolSocket.start();

try (ActiveConnection activeConnection = Connections.connect()) {
activeConnection.cancel();
activeConnection.setMultichannel(true);
activeConnection.reply("hello");
} finally {
coolSocket.stop();
}
}
}
34 changes: 34 additions & 0 deletions src/test/java/org/monora/coolsocket/core/PlainTransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.Test;
import org.monora.coolsocket.core.response.Response;
import org.monora.coolsocket.core.session.ActiveConnection;
import org.monora.coolsocket.core.session.CancelledException;
import org.monora.coolsocket.core.variant.*;

import java.io.IOException;
Expand Down Expand Up @@ -232,4 +233,37 @@ public void nonRoamingChildClosesTest() throws IOException, InterruptedException
coolSocket.stop();
}
}

@Test
public void handlesDisorderlyTransactionsWhenMultichannelEnabled() throws IOException, InterruptedException
{
String message = "Hello, World!";
CoolSocket coolSocket = new DefaultCoolSocket()
{
@Override
public void onConnected(@NotNull ActiveConnection activeConnection)
{
activeConnection.setMultichannel(true);
try {
// Put the read and write in the same order on both the reader and the sender.
activeConnection.reply(message);
activeConnection.receive();
} catch (IOException ignored) {
}
}
};

coolSocket.start();

try (ActiveConnection activeConnection = Connections.connect()) {
activeConnection.setMultichannel(true);

activeConnection.reply(message);
Response response = activeConnection.receive();

Assert.assertEquals("The message should match after the disorderly transaction", message, response.getAsString());
} finally {
coolSocket.stop();
}
}
}

0 comments on commit 2400a62

Please sign in to comment.