>> queue;
/**
- * Instantiates a new {@link Client} by attempting
- * to open the backing {@link AsynchronousSocketChannel}
- * with a default buffer size of {@code 4096} bytes.
+ * Instantiates a new {@link Client} by attempting to open the backing
+ * {@link AsynchronousSocketChannel} with a default buffer size of {@code 4096} bytes.
*/
public Client() {
this(4096);
}
/**
- * Instantiates a new {@link Client} by attempting
- * to open the backing {@link AsynchronousSocketChannel}
+ * Instantiates a new {@link Client} by attempting to open the backing {@link AsynchronousSocketChannel}
* with a provided buffer size in bytes.
*
* @param bufferSize The size of this {@link Client}'s buffer, in bytes.
@@ -233,8 +225,7 @@ public Client(int bufferSize) {
}
/**
- * Instantiates a new {@link Client} with an existing
- * {@link AsynchronousSocketChannel} with a provided
+ * Instantiates a new {@link Client} with an existing {@link AsynchronousSocketChannel} with a provided
* buffer size in bytes.
*
* @param bufferSize The size of this {@link Client}'s buffer, in bytes.
@@ -293,20 +284,20 @@ public final void connect(String address, int port) {
connect(address, port, 30L, TimeUnit.SECONDS, () -> {
System.err.println("Couldn't connect within 30 seconds!");
});
- }
-
- /**
- * Attempts to connect to a {@link Server} with the specified {@code address} and {@code port}
- * and a specified timeout. If the timeout is reached, then the {@link Runnable} is run and
- * the backing {@link AsynchronousSocketChannel} is closed.
- *
- * @param address The IP address to connect to.
- * @param port The port to connect to {@code 0 <= port <= 65535}.
- * @param timeout The timeout value.
- * @param unit The timeout unit.
- * @param onTimeout The {@link Runnable} that runs if this connection attempt times out.
- */
- public final void connect(String address, int port, long timeout, TimeUnit unit, Runnable onTimeout) {
+ }
+
+ /**
+ * Attempts to connect to a {@link Server} with the specified {@code address} and {@code port}
+ * and a specified timeout. If the timeout is reached, then the {@link Runnable} is run and
+ * the backing {@link AsynchronousSocketChannel} is closed.
+ *
+ * @param address The IP address to connect to.
+ * @param port The port to connect to {@code 0 <= port <= 65535}.
+ * @param timeout The timeout value.
+ * @param unit The timeout unit.
+ * @param onTimeout The {@link Runnable} that runs if this connection attempt times out.
+ */
+ public final void connect(String address, int port, long timeout, TimeUnit unit, Runnable onTimeout) {
Objects.requireNonNull(address);
if (port < 0 || port > 65535) {
@@ -321,7 +312,7 @@ public final void connect(String address, int port, long timeout, TimeUnit unit,
throw new IllegalStateException("This receiver is already connected!");
} catch (ExecutionException e) {
e.printStackTrace();
- } catch (Exception e) {
+ } catch (Exception e) {
onTimeout.run();
close();
}
@@ -334,14 +325,12 @@ public void close() {
}
/**
- * Registers a listener that fires when a {@link Client}
- * disconnects from a {@link Server}.
+ * Registers a listener that fires when a {@link Client} disconnects from a {@link Server}.
*
- * This listener is able to be used by both the {@link Client}
- * and {@link Server}, but can be independent of one-another.
+ * This listener is able to be used by both the {@link Client} and {@link Server}, but can
+ * be independent of one-another.
*
- * When calling this method more than once, multiple listeners
- * are registered.
+ * Calling this method more than once registers multiple listeners.
*
* @param listener A {@link Runnable}.
*/
@@ -350,8 +339,8 @@ public void onDisconnect(Runnable listener) {
}
/**
- * Requests {@code n} bytes and accepts a {@link Consumer}
- * holding the {@code n} bytes once received.
+ * Requests {@code n} bytes and accepts a {@link Consumer} holding the {@code n}
+ * bytes once received.
*
* @param n The number of bytes requested.
* @param consumer A {@link Consumer}.
@@ -371,11 +360,9 @@ public final void read(int n, Consumer consumer) {
}
/**
- * Calls {@link #read(int, Consumer)}, however once
- * finished, {@link #read(int, Consumer)} is called once
- * again with the same parameters; this loops indefinitely
- * whereas {@link #read(int, Consumer)} completes after
- * a single iteration.
+ * Calls {@link #read(int, Consumer)}, however once finished, {@link #read(int, Consumer)}
+ * is called once again with the same parameters; this loops indefinitely whereas
+ * {@link #read(int, Consumer)} completes after a single iteration.
*
* @param n The number of bytes requested.
* @param consumer Holds the operations that should be performed once
@@ -390,37 +377,37 @@ public void accept(ByteBuffer buffer) {
}
});
}
-
- /**
- * A helper method to block until the {@link CompletableFuture} contains a value.
- *
- * @param future The {@link CompletableFuture} to wait for.
- * @param The type of the {@link CompletableFuture} and the return type.
- * @return The instance of {@code T} contained in the {@link CompletableFuture}.
- */
- private T read(CompletableFuture future) {
- try {
- return future.get();
- } catch (Exception e) {
- throw new IllegalStateException("Blocking operation was cancelled!");
- }
- }
-
- /**
- * Reads a {@code byte} from the network, but blocks the executing thread
- * unlike {@link #readByte(Consumer)}.
- *
- * @return A {@code byte}.
- */
- public final byte readByte() {
+
+ /**
+ * A helper method to block until the {@link CompletableFuture} contains a value.
+ *
+ * @param future The {@link CompletableFuture} to wait for.
+ * @param The type of the {@link CompletableFuture} and the return type.
+ * @return The instance of {@code T} contained in the {@link CompletableFuture}.
+ */
+ private T read(CompletableFuture future) {
+ try {
+ return future.get();
+ } catch (Exception e) {
+ throw new IllegalStateException("Blocking operation was cancelled!");
+ }
+ }
+
+ /**
+ * Reads a {@code byte} from the network, but blocks the executing thread unlike
+ * {@link #readByte(Consumer)}.
+ *
+ * @return A {@code byte}.
+ */
+ public final byte readByte() {
CompletableFuture future = new CompletableFuture<>();
- readByte(future::complete);
+ readByte(future::complete);
return read(future);
}
/**
- * Requests a single {@code byte} and accepts a {@link Consumer}
- * with the {@code byte} when it is received.
+ * Requests a single {@code byte} and accepts a {@link Consumer} with the {@code byte}
+ * when it is received.
*
* @param consumer A {@link Consumer}.
*/
@@ -429,33 +416,31 @@ public final void readByte(Consumer consumer) {
}
/**
- * Calls {@link #readByte(Consumer)}, however once
- * finished, {@link #readByte(Consumer)} is called once
- * again with the same parameter; this loops indefinitely
- * whereas {@link #readByte(Consumer)} completes after
- * a single iteration.
+ * Calls {@link #readByte(Consumer)}, however once finished, {@link #readByte(Consumer)}
+ * is called once again with the same parameter; this loops indefinitely whereas
+ * {@link #readByte(Consumer)} completes after a single iteration.
*
* @param consumer A {@link Consumer}.
*/
public final void readByteAlways(Consumer consumer) {
readAlways(Byte.BYTES, buffer -> consumer.accept(buffer.get()));
- }
-
- /**
- * Reads a {@code char} from the network, but blocks the executing thread
- * unlike {@link #readChar(Consumer)}.
- *
- * @return A {@code char}.
- */
- public final char readChar() {
- CompletableFuture future = new CompletableFuture<>();
- readChar(future::complete);
- return read(future);
- }
-
- /**
- * Requests a single {@code char} and accepts a {@link Consumer}
- * with the {@code char} when it is received.
+ }
+
+ /**
+ * Reads a {@code char} from the network, but blocks the executing thread unlike
+ * {@link #readChar(Consumer)}.
+ *
+ * @return A {@code char}.
+ */
+ public final char readChar() {
+ CompletableFuture future = new CompletableFuture<>();
+ readChar(future::complete);
+ return read(future);
+ }
+
+ /**
+ * Requests a single {@code char} and accepts a {@link Consumer} with the {@code char}
+ * when it is received.
*
* @param consumer A {@link Consumer}.
*/
@@ -474,20 +459,20 @@ public final void readChar(Consumer consumer) {
*/
public final void readCharAlways(Consumer consumer) {
readAlways(Character.BYTES, buffer -> consumer.accept(buffer.getChar()));
- }
-
- /**
- * Reads a {@code double} from the network, but blocks the executing thread
- * unlike {@link #readDouble(DoubleConsumer)}.
- *
- * @return A {@code double}.
- */
- public final double readDouble() {
- CompletableFuture future = new CompletableFuture<>();
- readDouble(future::complete);
- return read(future);
- }
-
+ }
+
+ /**
+ * Reads a {@code double} from the network, but blocks the executing thread
+ * unlike {@link #readDouble(DoubleConsumer)}.
+ *
+ * @return A {@code double}.
+ */
+ public final double readDouble() {
+ CompletableFuture future = new CompletableFuture<>();
+ readDouble(future::complete);
+ return read(future);
+ }
+
/**
* Requests a single {@code double} and accepts a {@link Consumer}
* with the {@code double} when it is received.
@@ -509,20 +494,20 @@ public final void readDouble(DoubleConsumer consumer) {
*/
public final void readDoubleAlways(DoubleConsumer consumer) {
readAlways(Double.BYTES, buffer -> consumer.accept(buffer.getDouble()));
- }
-
- /**
- * Reads a {@code float} from the network, but blocks the executing thread
- * unlike {@link #readFloat(Consumer)}.
- *
- * @return A {@code float}.
- */
- public final float readFloat() {
- CompletableFuture future = new CompletableFuture<>();
- readFloat(future::complete);
- return read(future);
- }
-
+ }
+
+ /**
+ * Reads a {@code float} from the network, but blocks the executing thread
+ * unlike {@link #readFloat(Consumer)}.
+ *
+ * @return A {@code float}.
+ */
+ public final float readFloat() {
+ CompletableFuture future = new CompletableFuture<>();
+ readFloat(future::complete);
+ return read(future);
+ }
+
/**
* Requests a single {@code float} and accepts a {@link Consumer}
* with the {@code float} when it is received.
@@ -543,21 +528,21 @@ public final void readFloat(Consumer consumer) {
* @param consumer A {@link Consumer}.
*/
public final void readFloatAlways(Consumer consumer) {
- readAlways(Float.BYTES, buffer -> consumer.accept(buffer.getFloat()));
- }
-
- /**
- * Reads an {@code int} from the network, but blocks the executing thread
- * unlike {@link #readInt(IntConsumer)}.
- *
- * @return An {@code int}.
- */
- public final int readInt() {
- CompletableFuture future = new CompletableFuture<>();
- readInt(future::complete);
- return read(future);
- }
-
+ readAlways(Float.BYTES, buffer -> consumer.accept(buffer.getFloat()));
+ }
+
+ /**
+ * Reads an {@code int} from the network, but blocks the executing thread
+ * unlike {@link #readInt(IntConsumer)}.
+ *
+ * @return An {@code int}.
+ */
+ public final int readInt() {
+ CompletableFuture future = new CompletableFuture<>();
+ readInt(future::complete);
+ return read(future);
+ }
+
/**
* Requests a single {@code int} and accepts a {@link Consumer}
* with the {@code int} when it is received.
@@ -578,21 +563,21 @@ public final void readInt(IntConsumer consumer) {
* @param consumer An {@link IntConsumer}.
*/
public final void readIntAlways(IntConsumer consumer) {
- readAlways(Integer.BYTES, buffer -> consumer.accept(buffer.getInt()));
- }
-
- /**
- * Reads a {@code long} from the network, but blocks the executing thread
- * unlike {@link #readLong(LongConsumer)}.
- *
- * @return A {@code long}.
- */
- public final long readLong() {
- CompletableFuture future = new CompletableFuture<>();
- readLong(future::complete);
- return read(future);
- }
-
+ readAlways(Integer.BYTES, buffer -> consumer.accept(buffer.getInt()));
+ }
+
+ /**
+ * Reads a {@code long} from the network, but blocks the executing thread
+ * unlike {@link #readLong(LongConsumer)}.
+ *
+ * @return A {@code long}.
+ */
+ public final long readLong() {
+ CompletableFuture future = new CompletableFuture<>();
+ readLong(future::complete);
+ return read(future);
+ }
+
/**
* Requests a single {@code long} and accepts a {@link Consumer}
* with the {@code long} when it is received.
@@ -613,21 +598,21 @@ public final void readLong(LongConsumer consumer) {
* @param consumer A {@link LongConsumer}.
*/
public final void readLongAlways(LongConsumer consumer) {
- readAlways(Long.BYTES, buffer -> consumer.accept(buffer.getLong()));
- }
-
- /**
- * Reads a {@code short} from the network, but blocks the executing thread
- * unlike {@link #readShort(Consumer)}.
- *
- * @return A {@code short}.
- */
- public final short readShort() {
- CompletableFuture future = new CompletableFuture<>();
- readShort(future::complete);
- return read(future);
- }
-
+ readAlways(Long.BYTES, buffer -> consumer.accept(buffer.getLong()));
+ }
+
+ /**
+ * Reads a {@code short} from the network, but blocks the executing thread
+ * unlike {@link #readShort(Consumer)}.
+ *
+ * @return A {@code short}.
+ */
+ public final short readShort() {
+ CompletableFuture future = new CompletableFuture<>();
+ readShort(future::complete);
+ return read(future);
+ }
+
/**
* Requests a single {@code short} and accepts a {@link Consumer}
* with the {@code short} when it is received.
@@ -648,21 +633,21 @@ public final void readShort(Consumer consumer) {
* @param consumer A {@link Consumer}.
*/
public final void readShortAlways(Consumer consumer) {
- readAlways(Short.BYTES, buffer -> consumer.accept(buffer.getShort()));
- }
-
- /**
- * Reads a {@link String} from the network, but blocks the executing thread
- * unlike {@link #readString(Consumer)}.
- *
- * @return A {@code String}.
- */
- public final String readString() {
- CompletableFuture future = new CompletableFuture<>();
- readString(future::complete);
- return read(future);
- }
-
+ readAlways(Short.BYTES, buffer -> consumer.accept(buffer.getShort()));
+ }
+
+ /**
+ * Reads a {@link String} from the network, but blocks the executing thread
+ * unlike {@link #readString(Consumer)}.
+ *
+ * @return A {@code String}.
+ */
+ public final String readString() {
+ CompletableFuture future = new CompletableFuture<>();
+ readString(future::complete);
+ return read(future);
+ }
+
/**
* Requests a single {@link String} and accepts a {@link Consumer}
* with the {@link String} when it is received. A {@code short}
@@ -707,54 +692,39 @@ public final void readStringAlways(Consumer consumer) {
* it is called again.
*/
public final synchronized void flush() {
- for (int i = 0; i < outgoingPackets.size(); i++) {
- ByteBuffer raw = outgoingPackets.poll();
-
- if (encryption != null) {
- try {
- encryption.update(raw, raw.duplicate());
- raw.flip();
- } catch (Exception e) {
- throw new IllegalStateException("Exception occurred when encrypting:", e);
- }
- }
+ ByteBuffer raw = ByteBuffer.allocateDirect(outgoingPackets.stream().mapToInt(Packet::getSize).sum());
- if (!writing.getAndSet(true)) {
- channel.write(raw, this, PACKET_HANDLER);
- } else {
- packetsToFlush.offer(raw);
- }
+ Packet packet;
+
+ while ((packet = outgoingPackets.poll()) != null) {
+ packet.getQueue().forEach(consumer -> consumer.accept(raw));
}
- }
- /**
- * Gets the {@link Deque} that holds information
- * regarding requested bytes by this {@link Client}.
- *
- * @return A {@link Deque}.
- */
- private Deque>> getQueue() {
- return queue;
- }
+ raw.flip();
- /**
- * Gets the {@link Deque} that keeps track of nested
- * calls to {@link Client#read(int, Consumer)}.
- *
- * @return A {@link Deque}.
- */
- private Deque>> getStack() {
- return stack;
+ if (encryption != null) {
+ try {
+ encryption.update(raw, raw.duplicate());
+ raw.flip();
+ } catch (Exception e) {
+ throw new IllegalStateException("Exception occurred when encrypting:", e);
+ }
+ }
+
+ if (!writing.getAndSet(true)) {
+ channel.write(raw, this, PACKET_HANDLER);
+ } else {
+ packetsToFlush.offer(raw);
+ }
}
/**
- * Gets the {@link Queue} that manages outgoing
- * {@link Packet}s before writing them to the
+ * Gets the {@link Queue} that manages outgoing {@link Packet}s before writing them to the
* {@link Channel}.
*
* @return A {@link Queue}.
*/
- public Queue getOutgoingPackets() {
+ public Queue getOutgoingPackets() {
return outgoingPackets;
}
diff --git a/src/main/java/simplenet/packet/Packet.java b/src/main/java/simplenet/packet/Packet.java
index 2e5b395..26690d7 100644
--- a/src/main/java/simplenet/packet/Packet.java
+++ b/src/main/java/simplenet/packet/Packet.java
@@ -208,11 +208,9 @@ public Packet putString(String s) {
* or more of the headers depend on the size of the data
* contained within the {@link Packet} itself.
*
- * @param runnable
- * The {@link Runnable} containing calls to add
- * more data to this {@link Packet}.
- * @return
- * The {@link Packet} to allow for chained writes.
+ * @param runnable The {@link Runnable} containing calls to add
+ * more data to this {@link Packet}.
+ * @return The {@link Packet} to allow for chained writes.
*/
public Packet prepend(Runnable runnable) {
prepend = true;
@@ -221,19 +219,6 @@ public Packet prepend(Runnable runnable) {
return this;
}
- /**
- * Builds this {@link Packet}'s data into a {@link ByteBuffer}
- * for use in {@link #write(Client...)} and {@link #writeAndFlush(Client...)}.
- *
- * @return
- * A {@link ByteBuffer}.
- */
- private ByteBuffer build() {
- ByteBuffer buffer = ByteBuffer.allocateDirect(size);
- queue.forEach(consumer -> consumer.accept(buffer));
- return (ByteBuffer) buffer.flip();
- }
-
/**
* Queues this {@link Packet} to one (or more) {@link Client}(s).
*
@@ -247,10 +232,8 @@ public final void write(Client... clients) {
throw new IllegalArgumentException("You must send this packet to at least one channel!");
}
- ByteBuffer payload = build();
-
for (Client client : clients) {
- client.getOutgoingPackets().offer(payload);
+ client.getOutgoingPackets().offer(this);
}
}
@@ -266,10 +249,8 @@ public final void writeAndFlush(Client... clients) {
throw new IllegalArgumentException("You must send this packet to at least one channel!");
}
- ByteBuffer payload = build();
-
for (Client client : clients) {
- client.getOutgoingPackets().offer(payload);
+ client.getOutgoingPackets().offer(this);
client.flush();
}
}
@@ -277,11 +258,19 @@ public final void writeAndFlush(Client... clients) {
/**
* Gets the number of bytes in this {@link Packet}'s payload.
*
- * @return
- * The current size of this {@link Packet} measured in bytes.
+ * @return The current size of this {@link Packet} measured in bytes.
*/
public int getSize() {
return size;
}
+ /**
+ * Gets the backing queue of this {@link Packet}.
+ *
+ * @return A {@link Deque>}.
+ */
+ public Deque> getQueue() {
+ return queue;
+ }
+
}