add a Netty ByteBuf based outputstream with consumer

This commit is contained in:
Jörg Prante 2024-01-02 11:52:39 +01:00
parent 41cca5c606
commit fa3fa2e958

View file

@ -0,0 +1,132 @@
package org.xbib.net.http.server.netty.buffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
public class NettyDataBufferOutputStream extends OutputStream {
private ByteBuf pooledBuffer;
private long written;
private final long maxContentLength;
private final AtomicBoolean closed;
private final AtomicBoolean writeStarted;
private final ByteBufAllocator byteBufAllocator;
private final Consumer<ByteBuf> consumer;
public NettyDataBufferOutputStream(ByteBufAllocator byteBufAllocator,
int maxContentLength,
Consumer<ByteBuf> consumer) {
this.byteBufAllocator = byteBufAllocator;
this.maxContentLength = maxContentLength;
this.consumer = consumer;
this.closed = new AtomicBoolean();
this.writeStarted = new AtomicBoolean();
}
/**
* {@inheritDoc}
*/
@Override
public void write(final int b) throws IOException {
write(new byte[]{(byte) b}, 0, 1);
}
/**
* {@inheritDoc}
*/
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
/**
* {@inheritDoc}
*/
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
if (len < 1) {
return;
}
if (closed.get()) {
throw new IOException("Stream is closed");
}
int remaining = len;
int index = off;
ByteBuf buffer = pooledBuffer;
try {
if (buffer == null) {
pooledBuffer = buffer = byteBufAllocator.buffer();
}
while (remaining > 0) {
int toWrite = Math.min(remaining, buffer.writableBytes());
buffer.writeBytes(b, index, toWrite);
remaining -= toWrite;
index += toWrite;
if (!buffer.isWritable()) {
writeStarted.set(true);
ByteBuf currentBuffer = buffer;
pooledBuffer = buffer = byteBufAllocator.buffer();
consumer.accept(currentBuffer);
}
}
} catch (Exception e) {
if (buffer != null) {
buffer.release();
}
throw new IOException(e);
}
this.written += len;
if (maxContentLength != -1 && this.written >= maxContentLength) {
close();
}
}
/**
* {@inheritDoc}
*/
@Override
public void flush() throws IOException {
if (closed.get()) {
throw new IOException("Stream is closed");
}
internalFlush();
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException {
if (closed.get()) {
return;
}
closed.set(true);
internalFlush();
}
private void internalFlush() throws IOException {
try {
if (pooledBuffer != null) {
consumer.accept(pooledBuffer);
}
} catch (Exception e) {
throw new IOException(e);
} finally {
if (pooledBuffer != null) {
pooledBuffer.release();
pooledBuffer = null;
}
}
}
}