diff --git a/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/buffer/NettyDataBufferOutputStream.java b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/buffer/NettyDataBufferOutputStream.java new file mode 100644 index 0000000..9ef66f6 --- /dev/null +++ b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/buffer/NettyDataBufferOutputStream.java @@ -0,0 +1,132 @@ +package org.xbib.net.http.server.netty.buffer; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +public class NettyDataBufferOutputStream extends OutputStream { + + private ByteBuf pooledBuffer; + + private long written; + + private final long maxContentLength; + + private final AtomicBoolean closed; + + private final AtomicBoolean writeStarted; + + private final ByteBufAllocator byteBufAllocator; + + private final Consumer consumer; + + public NettyDataBufferOutputStream(ByteBufAllocator byteBufAllocator, + int maxContentLength, + Consumer consumer) { + this.byteBufAllocator = byteBufAllocator; + this.maxContentLength = maxContentLength; + this.consumer = consumer; + this.closed = new AtomicBoolean(); + this.writeStarted = new AtomicBoolean(); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(final int b) throws IOException { + write(new byte[]{(byte) b}, 0, 1); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 1) { + return; + } + if (closed.get()) { + throw new IOException("Stream is closed"); + } + int remaining = len; + int index = off; + ByteBuf buffer = pooledBuffer; + try { + if (buffer == null) { + pooledBuffer = buffer = byteBufAllocator.buffer(); + } + while (remaining > 0) { + int toWrite = Math.min(remaining, buffer.writableBytes()); + buffer.writeBytes(b, index, toWrite); + remaining -= toWrite; + index += toWrite; + if (!buffer.isWritable()) { + writeStarted.set(true); + ByteBuf currentBuffer = buffer; + pooledBuffer = buffer = byteBufAllocator.buffer(); + consumer.accept(currentBuffer); + } + } + } catch (Exception e) { + if (buffer != null) { + buffer.release(); + } + throw new IOException(e); + } + this.written += len; + if (maxContentLength != -1 && this.written >= maxContentLength) { + close(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void flush() throws IOException { + if (closed.get()) { + throw new IOException("Stream is closed"); + } + internalFlush(); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + if (closed.get()) { + return; + } + closed.set(true); + internalFlush(); + } + + private void internalFlush() throws IOException { + try { + if (pooledBuffer != null) { + consumer.accept(pooledBuffer); + } + } catch (Exception e) { + throw new IOException(e); + } finally { + if (pooledBuffer != null) { + pooledBuffer.release(); + pooledBuffer = null; + } + } + } +}