Skip to content

Commit

Permalink
RetainableByteBuffer as mutable (#11801)
Browse files Browse the repository at this point in the history
Tweaks to the RBB API to make the concept more uniform throughout the codebase.

* Make chunk a RBB
* Added Dynamic RBB as a replacement for both Accumulator and Aggregator

---------

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
gregw and lorban authored Jun 24, 2024
1 parent 0cb10d3 commit 36538d6
Show file tree
Hide file tree
Showing 134 changed files with 5,534 additions and 1,561 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ private void registerDemand(ContentSource contentSource)

private class ContentSource implements Content.Source
{
private static final Content.Chunk ALREADY_READ_CHUNK = new Content.Chunk()
private static final Content.Chunk ALREADY_READ_CHUNK = new Content.Chunk.Empty()
{
@Override
public ByteBuffer getByteBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private boolean parseAndFill()
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this);
LOG.debug("Parsing {} in {}", networkBuffer, this);
// Always parse even empty buffers to advance the parser.
if (parse())
{
Expand Down Expand Up @@ -347,7 +347,7 @@ private boolean parse()
if (getHttpChannel().isTunnel(method, status))
return true;

if (!networkBuffer.hasRemaining())
if (networkBuffer.isEmpty())
return false;

if (!HttpStatus.isInformational(status))
Expand All @@ -359,7 +359,7 @@ private boolean parse()
return false;
}

if (!networkBuffer.hasRemaining())
if (networkBuffer.isEmpty())
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public void dispose() throws Exception
{
LifeCycle.stop(client);
LifeCycle.stop(server);
Set<ArrayByteBufferPool.Tracking.Buffer> serverLeaks = serverBufferPool.getLeaks();
Set<ArrayByteBufferPool.Tracking.TrackedBuffer> serverLeaks = serverBufferPool.getLeaks();
assertEquals(0, serverLeaks.size(), serverBufferPool.dumpLeaks());
Set<ArrayByteBufferPool.Tracking.Buffer> clientLeaks = clientBufferPool.getLeaks();
Set<ArrayByteBufferPool.Tracking.TrackedBuffer> clientLeaks = clientBufferPool.getLeaks();
assertEquals(0, clientLeaks.size(), clientBufferPool.dumpLeaks());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ protected int networkFill(ByteBuffer input) throws IOException
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

ArrayByteBufferPool bufferPool = (ArrayByteBufferPool)server.getByteBufferPool();
Pool<RetainableByteBuffer> bucket = bufferPool.poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
Pool<RetainableByteBuffer.Pooled> bucket = bufferPool.poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
assertEquals(1, bucket.size());
assertEquals(1, bucket.getIdleCount());

Expand All @@ -773,7 +773,7 @@ public void testEncryptedOutputBufferRepooling() throws Exception
ByteBufferPool bufferPool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
RetainableByteBuffer.Wrapper buffer = new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
{
Expand Down Expand Up @@ -843,7 +843,7 @@ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushReturnsFalse(bool
ByteBufferPool bufferPool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
RetainableByteBuffer.Wrapper buffer = new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
{
Expand Down Expand Up @@ -928,7 +928,7 @@ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushThrows(boolean cl
ByteBufferPool bufferPool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
RetainableByteBuffer.Wrapper buffer = new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.eclipse.jetty.io.Content.Source.asByteBuffer;
import static org.eclipse.jetty.toolchain.test.StackUtils.supply;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.eclipse.jetty.util.BufferUtil.toBuffer;
import static org.eclipse.jetty.util.BufferUtil.toHexString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -169,7 +171,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
MultiPart.Part part = parts.iterator().next();
assertEquals(name, part.getName());
assertEquals("text/plain", part.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertArrayEquals(data, Content.Source.asByteBuffer(part.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(part.getContentSource())));
}
});

Expand Down Expand Up @@ -222,7 +224,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals(contentType, part.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals(fileName, part.getFileName());
assertEquals(data.length, part.getContentSource().getLength());
assertArrayEquals(data, Content.Source.asByteBuffer(part.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(part.getContentSource())));
}
});

Expand Down Expand Up @@ -336,7 +338,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals("application/octet-stream", filePart.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals(tmpPath.getFileName().toString(), filePart.getFileName());
assertEquals(Files.size(tmpPath), filePart.getContentSource().getLength());
assertArrayEquals(data, Content.Source.asByteBuffer(filePart.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(filePart.getContentSource())));
}
});

Expand Down Expand Up @@ -377,7 +379,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals("file", filePart.getName());
assertEquals("application/octet-stream", filePart.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals("fileName", filePart.getFileName());
assertArrayEquals(fileData, Content.Source.asByteBuffer(filePart.getContentSource()).array());
assertEquals(toHexString(toBuffer(fileData)), toHexString(asByteBuffer(filePart.getContentSource())));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public RetainableByteBuffer decode(ByteBuffer compressed)
RetainableByteBuffer result = acquire(length);
for (RetainableByteBuffer buffer : _inflateds)
{
BufferUtil.append(result.getByteBuffer(), buffer.getByteBuffer());
buffer.appendTo(result);
buffer.release();
}
_inflateds.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ else if (type != HttpTokens.Type.SPACE && type != HttpTokens.Type.HTAB)
if (state == State.EPILOGUE)
notifyComplete();
else
throw new EOFException("unexpected EOF");
throw new EOFException("unexpected EOF in " + state);
}
}
catch (Throwable x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@

package org.eclipse.jetty.http;

import java.nio.ByteBuffer;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.BufferUtil;

public class Trailers implements Content.Chunk
public class Trailers extends Content.Chunk.Empty
{
private final HttpFields trailers;

Expand All @@ -27,12 +24,6 @@ public Trailers(HttpFields trailers)
this.trailers = trailers;
}

@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}

@Override
public boolean isLast()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ interface Factory
HttpContent getContent(String path) throws IOException;
}

// TODO add a writeTo semantic, then update IOResources to use a RBB.Dynamic

/**
* HttpContent Wrapper.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public void before()
pool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
counter.incrementAndGet();
return new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
return new RetainableByteBuffer.Mutable.Wrapper(super.acquire(size, direct))
{
@Override
public boolean release()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.toolchain.test.FS;
Expand Down Expand Up @@ -1601,26 +1602,19 @@ public Content.Chunk read()
}
}

private static class NonRetainableChunk implements Content.Chunk
private static class NonRetainableChunk extends RetainableByteBuffer.NonRetainableByteBuffer implements Content.Chunk
{
private final ByteBuffer _content;
private final boolean _isLast;
private final Throwable _failure;

public NonRetainableChunk(Content.Chunk chunk)
{
_content = BufferUtil.copy(chunk.getByteBuffer());
super(BufferUtil.copy(chunk.getByteBuffer()));
_isLast = chunk.isLast();
_failure = chunk.getFailure();
chunk.release();
}

@Override
public ByteBuffer getByteBuffer()
{
return _content;
}

@Override
public boolean isLast()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void onHeaders(HeadersFrame frame)
@Override
public void onData(DataFrame frame)
{
NetworkBuffer networkBuffer = producer.networkBuffer;
RetainableByteBuffer.Mutable networkBuffer = producer.networkBuffer;
session.onData(new StreamData(frame, networkBuffer));
}

Expand Down Expand Up @@ -304,15 +304,15 @@ public void onConnectionFailure(int error, String reason)
protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillableCallback = new FillableCallback();
private NetworkBuffer networkBuffer;
private RetainableByteBuffer.Mutable networkBuffer;
private boolean shutdown;
private boolean failed;

private void setInputBuffer(ByteBuffer byteBuffer)
{
acquireNetworkBuffer();
// TODO handle buffer overflow?
networkBuffer.put(byteBuffer);
if (!networkBuffer.append(byteBuffer))
LOG.warn("overflow");
}

@Override
Expand All @@ -339,7 +339,7 @@ public Runnable produce()
{
while (networkBuffer.hasRemaining())
{
session.getParser().parse(networkBuffer.getBuffer());
session.getParser().parse(networkBuffer.getByteBuffer());
if (failed)
return null;
}
Expand All @@ -357,7 +357,7 @@ public Runnable produce()

// Here we know that this.networkBuffer is not retained by
// application code: either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.getBuffer());
int filled = fill(getEndPoint(), networkBuffer.getByteBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);

Expand Down Expand Up @@ -391,30 +391,30 @@ private void acquireNetworkBuffer()
{
if (networkBuffer == null)
{
networkBuffer = new NetworkBuffer();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
}
}

private void reacquireNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining())
throw new IllegalStateException();

currentBuffer.release();
networkBuffer = new NetworkBuffer();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("Reacquired {}<-{}", currentBuffer, networkBuffer);
}

private void releaseNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

Expand Down Expand Up @@ -472,69 +472,21 @@ public boolean canRetain()
}

@Override
public void retain()
{
retainable.retain();
}

@Override
public boolean release()
{
return retainable.release();
}
}

private class NetworkBuffer implements Retainable
{
private final RetainableByteBuffer delegate;

private NetworkBuffer()
{
delegate = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
}

public ByteBuffer getBuffer()
{
return delegate.getByteBuffer();
}

public boolean isRetained()
{
return delegate.isRetained();
}

public boolean hasRemaining()
{
return delegate.hasRemaining();
}

@Override
public boolean canRetain()
{
return delegate.canRetain();
return retainable.isRetained();
}

@Override
public void retain()
{
delegate.retain();
retainable.retain();
}

@Override
public boolean release()
{
if (delegate.release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this);
return true;
}
return false;
}

private void put(ByteBuffer source)
{
BufferUtil.append(delegate.getByteBuffer(), source);
return retainable.release();
}
}
}
Loading

0 comments on commit 36538d6

Please sign in to comment.