async file reader

This commit is contained in:
Jörg Prante 2022-04-26 22:39:23 +02:00
parent 83e75bec05
commit 0ea9f1c034
3 changed files with 58 additions and 62 deletions

13
NOTICE.txt Normal file
View file

@ -0,0 +1,13 @@
The work in org.xbib.event.async is based upon the work in
https://github.com/javasync/RxIo
and org.xbib.event.yield is based upon the work in
https://github.com/tinyield/jayield
by Fernando Miguel Gamboa de Carvalho (fmcarvalho)
branched as of 23 December, 2021
License: Apache 2.0

View file

@ -16,47 +16,17 @@ import static java.nio.channels.AsynchronousFileChannel.open;
* Asynchronous non-blocking read operations that use an underlying AsynchronousFileChannel. * Asynchronous non-blocking read operations that use an underlying AsynchronousFileChannel.
*/ */
public abstract class AbstractAsyncFileReaderLines { public abstract class AbstractAsyncFileReaderLines {
static final int BUFFER_SIZE = 4096 * 8; // the transfer buffer size
static final int BUFFER_SIZE = 8192;
private static final int MAX_LINE_SIZE = 4096; private static final int MAX_LINE_SIZE = 4096;
private static final int LF = '\n'; private static final int LF = '\n';
private static final int CR = '\r'; private static final int CR = '\r';
//
// This flag will track whether this `Subscription` is to be considered cancelled or not.
private boolean cancelled = false; private boolean cancelled = false;
/**
* Asynchronous read chunk operation, callback based.
*/
private static void readBytes(
AsynchronousFileChannel asyncFile,
long position, // current read or write position in file
byte[] data, // buffer for current producing line
int size,
ObjIntConsumer<Throwable> completed) {
if (completed == null)
throw new InvalidParameterException("callback can't be null!");
if (size > data.length)
size = data.length;
if (size == 0) {
completed.accept(null, 0);
return;
}
ByteBuffer buf = ByteBuffer.wrap(data, 0, size);
CompletionHandler<Integer, Object> readCompleted =
new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
completed.accept(null, result);
}
@Override
public void failed(Throwable exc, Object attachment) {
completed.accept(exc, 0);
}
};
asyncFile.read(buf, position, null, readCompleted);
}
protected abstract void onError(Throwable error); protected abstract void onError(Throwable error);
protected abstract void onComplete(); protected abstract void onComplete();
@ -81,11 +51,9 @@ public abstract class AbstractAsyncFileReaderLines {
* The resulting characters are parsed by line and passed to the destination buffer. * The resulting characters are parsed by line and passed to the destination buffer.
* *
* @param asyncFile the nio associated file channel. * @param asyncFile the nio associated file channel.
* @param bufferSize * @param bufferSize buffer size
*/ */
final void readLines( final void readLines(AsynchronousFileChannel asyncFile, int bufferSize) {
AsynchronousFileChannel asyncFile,
int bufferSize) {
readLines(asyncFile, 0, 0, 0, new byte[bufferSize], new byte[MAX_LINE_SIZE], 0); readLines(asyncFile, 0, 0, 0, new byte[bufferSize], new byte[MAX_LINE_SIZE], 0);
} }
@ -102,14 +70,8 @@ public abstract class AbstractAsyncFileReaderLines {
* @param auxline the transfer buffer. * @param auxline the transfer buffer.
* @param linepos current position in producing line. * @param linepos current position in producing line.
*/ */
private void readLines( private void readLines(AsynchronousFileChannel asyncFile, long position, int bufpos,
AsynchronousFileChannel asyncFile, int bufsize, byte[] buffer, byte[] auxline, int linepos)
long position, // current read or write position in file
int bufpos, // read position in buffer
int bufsize, // total bytes in buffer
byte[] buffer, // buffer for current producing line
byte[] auxline, // the transfer buffer
int linepos) // current position in producing line
{ {
while (bufpos < bufsize) { while (bufpos < bufsize) {
if (buffer[bufpos] == LF) { if (buffer[bufpos] == LF) {
@ -122,7 +84,7 @@ public abstract class AbstractAsyncFileReaderLines {
linepos = 0; linepos = 0;
} else auxline[linepos++] = buffer[bufpos++]; } else auxline[linepos++] = buffer[bufpos++];
} }
int lastLinePos = linepos; // we need a final variable captured in the next lambda int lastLinePos = linepos;
if (!isCancelled()) readBytes(asyncFile, position, buffer, buffer.length, (err, res) -> { if (!isCancelled()) readBytes(asyncFile, position, buffer, buffer.length, (err, res) -> {
if (isCancelled()) if (isCancelled())
return; return;
@ -132,11 +94,9 @@ public abstract class AbstractAsyncFileReaderLines {
return; return;
} }
if (res <= 0) { if (res <= 0) {
// needed for last line that doesn't end with LF
if (lastLinePos > 0) { if (lastLinePos > 0) {
produceLine(auxline, lastLinePos); produceLine(auxline, lastLinePos);
} }
// Following it will invoke onComplete()
close(asyncFile); close(asyncFile);
} else { } else {
readLines(asyncFile, position + res, 0, res, buffer, auxline, lastLinePos); readLines(asyncFile, position + res, 0, res, buffer, auxline, lastLinePos);
@ -144,17 +104,11 @@ public abstract class AbstractAsyncFileReaderLines {
}); });
} }
/**
* Performed from the IO background thread when it reached the end of the file.
*
* @param asyncFile
*/
private void close(AsynchronousFileChannel asyncFile) { private void close(AsynchronousFileChannel asyncFile) {
try { try {
asyncFile.close(); asyncFile.close();
} catch (IOException e) { } catch (IOException e) {
onError(e); // Failed terminal state. onError(e);
// Emission has finished. Does not propagate error on CompletableFuture.
} finally { } finally {
onComplete(); onComplete();
} }
@ -170,4 +124,34 @@ public abstract class AbstractAsyncFileReaderLines {
String line = new String(auxline, 0, linepos, StandardCharsets.UTF_8); String line = new String(auxline, 0, linepos, StandardCharsets.UTF_8);
onProduceLine(line); onProduceLine(line);
} }
/**
* Asynchronous read chunk operation, callback based.
*/
private static void readBytes(AsynchronousFileChannel asyncFile,
long position, byte[] data, int size, ObjIntConsumer<Throwable> completed) {
if (completed == null) {
throw new InvalidParameterException("callback can't be null!");
}
if (size > data.length) {
size = data.length;
}
if (size == 0) {
completed.accept(null, 0);
return;
}
ByteBuffer buf = ByteBuffer.wrap(data, 0, size);
CompletionHandler<Integer, Object> readCompleted = new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
completed.accept(null, result);
}
@Override
public void failed(Throwable exc, Object attachment) {
completed.accept(exc, 0);
}
};
asyncFile.read(buf, position, null, readCompleted);
}
} }

View file

@ -21,8 +21,7 @@ public class AsyncFileReaderBytes {
int position, int position,
ByteArrayOutputStream out) { ByteArrayOutputStream out) {
return readToByteArrayStream(asyncFile, buffer, position, out) return readToByteArrayStream(asyncFile, buffer, position, out)
.thenCompose(index -> .thenCompose(index -> index < 0
index < 0
? completedFuture(position) ? completedFuture(position)
: readAllBytes(asyncFile, buffer.clear(), position + index, out)); : readAllBytes(asyncFile, buffer.clear(), position + index, out));
@ -34,12 +33,12 @@ public class AsyncFileReaderBytes {
int position, int position,
ByteArrayOutputStream out) { ByteArrayOutputStream out) {
CompletableFuture<Integer> promise = new CompletableFuture<>(); CompletableFuture<Integer> promise = new CompletableFuture<>();
asyncFile.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() { asyncFile.read(buffer, position, buffer, new CompletionHandler<>() {
@Override @Override
public void completed(Integer result, ByteBuffer attachment) { public void completed(Integer result, ByteBuffer attachment) {
if (result > 0) { if (result > 0) {
attachment.flip(); attachment.flip();
byte[] data = new byte[attachment.limit()]; // limit = result byte[] data = new byte[attachment.limit()];
attachment.get(data); attachment.get(data);
write(out, data); write(out, data);
} }