Skip to content

Commit

Permalink
take never retains
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed May 17, 2024
1 parent ad75635 commit fda044d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
Expand Down Expand Up @@ -521,20 +523,18 @@ public void testRequestWithContinuationFramesWithEmptyContinuationFrame() throws
generator.control(accumulator, new PrefaceFrame());
generator.control(accumulator, new SettingsFrame(new HashMap<>(), false));
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);

long offset = accumulator.size();
generator.control(accumulator, new HeadersFrame(1, metaData, null, true));

// Take the ContinuationFrame header, duplicate it, and set the length to zero.
/* TODO
List<ByteBuffer> buffers = accumulator.getByteBuffers();
ByteBuffer continuationFrameHeader = buffers.get(4);
ByteBuffer duplicate = ByteBuffer.allocate(continuationFrameHeader.remaining());
duplicate.put(continuationFrameHeader).flip();
continuationFrameHeader.flip();
continuationFrameHeader.put(0, (byte)0);
continuationFrameHeader.putShort(1, (short)0);
// Insert a CONTINUATION frame header for the body of the previous CONTINUATION frame.
accumulator.add(RetainableByteBuffer.wrap(duplicate));
*/
RetainableByteBuffer headers = accumulator.take(offset);
RetainableByteBuffer.Mutable continuation = headers.copy().asMutable();
accumulator.add(headers);
continuation.limit(9);
continuation.put(0, (byte)0x00);
continuation.put(1, (byte)0x00);
continuation.put(2, (byte)0x00);
accumulator.add(continuation);
return accumulator;
});
}
Expand All @@ -548,22 +548,40 @@ public void testRequestWithContinuationFramesWithEmptyLastContinuationFrame() th
generator.control(accumulator, new PrefaceFrame());
generator.control(accumulator, new SettingsFrame(new HashMap<>(), false));
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);

long offset = accumulator.size();
generator.control(accumulator, new HeadersFrame(1, metaData, null, true));
/* TODO

RetainableByteBuffer headers = accumulator.take(offset);
System.err.println(accumulator.toDetailString());
System.err.println(headers.toDetailString());

// Take the last CONTINUATION frame and reset the flag.
List<ByteBuffer> buffers = accumulator.getByteBuffers();
ByteBuffer continuationFrameHeader = buffers.get(buffers.size() - 2);
continuationFrameHeader.put(4, (byte)0);
offset = 0;
while (true)
{
int length = ((headers.get(offset) & 0xFF) << 16) + ((headers.get(offset + 1) & 0xFF) << 8) + (headers.get(offset + 2) & 0xFF);
byte flag = headers.get(offset + 4);
if (flag == 0x04)
{
RetainableByteBuffer.Mutable last = headers.take(offset).asMutable();
accumulator.add(headers);
last.put(4, (byte)0);
accumulator.add(last);
break;
}
offset += 9 + length;
}

// Add a last, empty, CONTINUATION frame.
ByteBuffer last = ByteBuffer.wrap(new byte[]{
0, 0, 0, // Length
(byte)FrameType.CONTINUATION.getType(),
(byte)Flags.END_HEADERS,
0, 0, 0, 1 // Stream ID
});
accumulator.append(RetainableByteBuffer.wrap(last));
accumulator.add(
ByteBuffer.wrap(new byte[]{
0, 0, 0, // Length
(byte)FrameType.CONTINUATION.getType(),
(byte)Flags.END_HEADERS,
0, 0, 0, 1 // Stream ID
}));

*/
return accumulator;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;

import org.eclipse.jetty.util.Blocker;
Expand Down Expand Up @@ -144,7 +145,7 @@ default Mutable asMutable() throws ReadOnlyBufferException
throw new ReadOnlyBufferException();
if (this instanceof Mutable mutable)
return mutable;
return new FixedCapacity(getByteBuffer(), this);
throw new ReadOnlyBufferException();
}

/**
Expand Down Expand Up @@ -377,7 +378,8 @@ default RetainableByteBuffer slice(long length)

/**
* Take the contents of this buffer from an index.
* @return A buffer with the contents of this buffer from the index, avoiding copies if possible.
* @return A buffer with the contents of this buffer from the index, avoiding copies if possible, but with
* no shared internal buffers.
*/
default RetainableByteBuffer take(long fromIndex)
{
Expand All @@ -386,7 +388,9 @@ default RetainableByteBuffer take(long fromIndex)
RetainableByteBuffer slice = slice();
limit(fromIndex);
slice.skip(fromIndex);
return slice;
RetainableByteBuffer copy = slice.copy();
slice.release();
return copy;
}

/**
Expand Down Expand Up @@ -1436,23 +1440,39 @@ public RetainableByteBuffer take(long fromIndex)
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
_aggregate = null;

for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();)
for (ListIterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
RetainableByteBuffer buffer = i.next();

long size = buffer.size();
if (fromIndex >= size)
{
// the sub buffer stays with this RBB
fromIndex -= size;
}
else if (fromIndex == 0)
{
// the sub buffer is added to the new RBB
i.remove();
buffers.add(buffer);
}
else
{
buffers.add(buffer.take(fromIndex));
// the sub buffer is split between this RBB and the new RBB
if (fromIndex > (buffer.size() / 2))
{
// copy only the small part at the end
buffers.add(buffer.take(fromIndex));
}
else
{
// copy only the small part at the beginning
RetainableByteBuffer slice = buffer.slice();
slice.limit(fromIndex);
i.set(slice.copy());
buffer.skip(fromIndex);
buffers.add(buffer);
}
fromIndex = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,17 +1237,21 @@ public void testTake(Mutable buffer)
public void testTakeFrom(Mutable buffer)
{
buffer.put("Hello".getBytes(StandardCharsets.UTF_8));
buffer.put((byte)' ');
CountDownLatch released = new CountDownLatch(1);
buffer.add(RetainableByteBuffer.wrap(BufferUtil.toBuffer("cruel ".getBytes(StandardCharsets.UTF_8)), released::countDown));
buffer.add(RetainableByteBuffer.wrap(BufferUtil.toBuffer(" cruel ".getBytes(StandardCharsets.UTF_8)), released::countDown));
buffer.add(RetainableByteBuffer.wrap(BufferUtil.toBuffer("world!".getBytes(StandardCharsets.UTF_8)), released::countDown));
RetainableByteBuffer space = buffer.take(5);

RetainableByteBuffer bang = space.take(space.size() - 1);
RetainableByteBuffer cruelWorld = space.take(1);

assertThat(BufferUtil.toString(buffer.getByteBuffer()), is("Hello"));
assertThat(BufferUtil.toString(space.getByteBuffer()), is(" "));
assertThat(BufferUtil.toString(cruelWorld.getByteBuffer()), is("cruel world!"));
assertThat(BufferUtil.toString(cruelWorld.getByteBuffer()), is("cruel world"));
assertThat(BufferUtil.toString(bang.getByteBuffer()), is("!"));
space.release();
cruelWorld.release();
bang.release();
assertTrue(buffer.release());
}

Expand Down

0 comments on commit fda044d

Please sign in to comment.