diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 0000000..0088fc5 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,13 @@ +The work in org.xbib.event.async is based upon the work in + +https://github.com/javasync/RxIo + +and org.xbib.event.yield is based upon the work in + +https://github.com/tinyield/jayield + +by Fernando Miguel Gamboa de Carvalho (fmcarvalho) + +branched as of 23 December, 2021 + +License: Apache 2.0 diff --git a/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java b/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java index 45e6081..9360afd 100644 --- a/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java +++ b/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java @@ -16,47 +16,17 @@ import static java.nio.channels.AsynchronousFileChannel.open; * Asynchronous non-blocking read operations that use an underlying AsynchronousFileChannel. */ public abstract class AbstractAsyncFileReaderLines { - static final int BUFFER_SIZE = 4096 * 8; // the transfer buffer size + + static final int BUFFER_SIZE = 8192; + private static final int MAX_LINE_SIZE = 4096; + private static final int LF = '\n'; + private static final int CR = '\r'; - // - // This flag will track whether this `Subscription` is to be considered cancelled or not. + private boolean cancelled = false; - /** - * Asynchronous read chunk operation, callback based. - */ - private static void readBytes( - AsynchronousFileChannel asyncFile, - long position, // current read or write position in file - byte[] data, // buffer for current producing line - int size, - ObjIntConsumer completed) { - if (completed == null) - throw new InvalidParameterException("callback can't be null!"); - if (size > data.length) - size = data.length; - if (size == 0) { - completed.accept(null, 0); - return; - } - ByteBuffer buf = ByteBuffer.wrap(data, 0, size); - CompletionHandler readCompleted = - new CompletionHandler<>() { - @Override - public void completed(Integer result, Object attachment) { - completed.accept(null, result); - } - - @Override - public void failed(Throwable exc, Object attachment) { - completed.accept(exc, 0); - } - }; - asyncFile.read(buf, position, null, readCompleted); - } - protected abstract void onError(Throwable error); protected abstract void onComplete(); @@ -81,11 +51,9 @@ public abstract class AbstractAsyncFileReaderLines { * The resulting characters are parsed by line and passed to the destination buffer. * * @param asyncFile the nio associated file channel. - * @param bufferSize + * @param bufferSize buffer size */ - final void readLines( - AsynchronousFileChannel asyncFile, - int bufferSize) { + final void readLines(AsynchronousFileChannel asyncFile, int bufferSize) { readLines(asyncFile, 0, 0, 0, new byte[bufferSize], new byte[MAX_LINE_SIZE], 0); } @@ -102,14 +70,8 @@ public abstract class AbstractAsyncFileReaderLines { * @param auxline the transfer buffer. * @param linepos current position in producing line. */ - private void readLines( - AsynchronousFileChannel asyncFile, - long position, // current read or write position in file - int bufpos, // read position in buffer - int bufsize, // total bytes in buffer - byte[] buffer, // buffer for current producing line - byte[] auxline, // the transfer buffer - int linepos) // current position in producing line + private void readLines(AsynchronousFileChannel asyncFile, long position, int bufpos, + int bufsize, byte[] buffer, byte[] auxline, int linepos) { while (bufpos < bufsize) { if (buffer[bufpos] == LF) { @@ -122,7 +84,7 @@ public abstract class AbstractAsyncFileReaderLines { linepos = 0; } else auxline[linepos++] = buffer[bufpos++]; } - int lastLinePos = linepos; // we need a final variable captured in the next lambda + int lastLinePos = linepos; if (!isCancelled()) readBytes(asyncFile, position, buffer, buffer.length, (err, res) -> { if (isCancelled()) return; @@ -132,11 +94,9 @@ public abstract class AbstractAsyncFileReaderLines { return; } if (res <= 0) { - // needed for last line that doesn't end with LF if (lastLinePos > 0) { produceLine(auxline, lastLinePos); } - // Following it will invoke onComplete() close(asyncFile); } else { readLines(asyncFile, position + res, 0, res, buffer, auxline, lastLinePos); @@ -144,17 +104,11 @@ public abstract class AbstractAsyncFileReaderLines { }); } - /** - * Performed from the IO background thread when it reached the end of the file. - * - * @param asyncFile - */ private void close(AsynchronousFileChannel asyncFile) { try { asyncFile.close(); } catch (IOException e) { - onError(e); // Failed terminal state. - // Emission has finished. Does not propagate error on CompletableFuture. + onError(e); } finally { onComplete(); } @@ -170,4 +124,34 @@ public abstract class AbstractAsyncFileReaderLines { String line = new String(auxline, 0, linepos, StandardCharsets.UTF_8); onProduceLine(line); } + + /** + * Asynchronous read chunk operation, callback based. + */ + private static void readBytes(AsynchronousFileChannel asyncFile, + long position, byte[] data, int size, ObjIntConsumer completed) { + if (completed == null) { + throw new InvalidParameterException("callback can't be null!"); + } + if (size > data.length) { + size = data.length; + } + if (size == 0) { + completed.accept(null, 0); + return; + } + ByteBuffer buf = ByteBuffer.wrap(data, 0, size); + CompletionHandler readCompleted = new CompletionHandler<>() { + @Override + public void completed(Integer result, Object attachment) { + completed.accept(null, result); + } + + @Override + public void failed(Throwable exc, Object attachment) { + completed.accept(exc, 0); + } + }; + asyncFile.read(buf, position, null, readCompleted); + } } diff --git a/src/main/java/org/xbib/event/async/AsyncFileReaderBytes.java b/src/main/java/org/xbib/event/async/AsyncFileReaderBytes.java index 227082b..7f6b429 100644 --- a/src/main/java/org/xbib/event/async/AsyncFileReaderBytes.java +++ b/src/main/java/org/xbib/event/async/AsyncFileReaderBytes.java @@ -21,8 +21,7 @@ public class AsyncFileReaderBytes { int position, ByteArrayOutputStream out) { return readToByteArrayStream(asyncFile, buffer, position, out) - .thenCompose(index -> - index < 0 + .thenCompose(index -> index < 0 ? completedFuture(position) : readAllBytes(asyncFile, buffer.clear(), position + index, out)); @@ -34,12 +33,12 @@ public class AsyncFileReaderBytes { int position, ByteArrayOutputStream out) { CompletableFuture promise = new CompletableFuture<>(); - asyncFile.read(buffer, position, buffer, new CompletionHandler() { + asyncFile.read(buffer, position, buffer, new CompletionHandler<>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (result > 0) { attachment.flip(); - byte[] data = new byte[attachment.limit()]; // limit = result + byte[] data = new byte[attachment.limit()]; attachment.get(data); write(out, data); }