add record split write to MarcWriter

This commit is contained in:
Jörg Prante 2022-12-05 13:30:06 +01:00
parent d2905637c1
commit 7a3ccf94a4

View file

@ -26,30 +26,45 @@ import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.Deflater;
import java.util.zip.GZIPOutputStream;
/**
* An ISO 2709 "stream format" MARC writer.
*/
public class MarcWriter extends MarcContentHandler implements Flushable, Closeable {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final int DEFAULT_BUFFER_SIZE = 65536;
private final ReentrantLock lock = new ReentrantLock(true);
private final ReentrantLock lock;
private final BytesStreamOutput bytesStreamOutput;
private final SeparatorOutputStream out;
private final Charset charset;
private SeparatorOutputStream out;
private boolean fatalErrors;
private Exception exception;
private String fileNamePattern;
private AtomicInteger fileNameCounter;
private int splitlimit;
private int bufferSize;
private boolean compress;
/**
* Create a MarcWriter on an underlying output stream.
* @param out the underlying output stream
@ -69,6 +84,19 @@ public class MarcWriter extends MarcContentHandler implements Flushable, Closeab
this.out = new SeparatorOutputStream(out, buffersize);
this.charset = charset;
this.bytesStreamOutput = new BytesStreamOutput();
this.lock = new ReentrantLock();
}
public MarcWriter(String fileNamePattern, Charset charset, int bufferSize, int splitlimit, boolean compress) throws IOException {
this.fileNameCounter = new AtomicInteger(0);
this.fileNamePattern = fileNamePattern;
this.splitlimit = splitlimit;
this.bufferSize = bufferSize;
this.compress = compress;
this.charset = charset;
this.bytesStreamOutput = new BytesStreamOutput();
this.lock = new ReentrantLock();
newOut(fileNamePattern, fileNameCounter, bufferSize, compress);
}
@Override
@ -167,7 +195,8 @@ public class MarcWriter extends MarcContentHandler implements Flushable, Closeab
// of the record, following the field terminator of the last data field."
// https://www.loc.gov/marc/specifications/specrecstruc.html
out.chunk(new DefaultChunk(InformationSeparator.GS, null));
} catch (IOException e) {
afterRecord();
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
@ -190,10 +219,68 @@ public class MarcWriter extends MarcContentHandler implements Flushable, Closeab
return exception;
}
private void handleException(IOException e) {
private void handleException(Exception e) {
exception = e;
if (fatalErrors) {
throw new UncheckedIOException(e);
throw new IllegalStateException(e);
}
}
@Override
public void record(MarcRecord marcRecord) {
if (exception != null) {
return;
}
lock.lock();
try {
super.record(marcRecord);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
/**
* Split records if configured.
*/
private void afterRecord() {
if (fileNamePattern != null) {
if (splitlimit != -1) {
if (getRecordCounter() % splitlimit == 0) {
if (out != null) {
try {
endCollection();
endDocument();
out.close();
newOut(fileNamePattern, fileNameCounter, bufferSize, compress);
startDocument();
beginCollection();
} catch (Exception e) {
handleException(e);
}
}
}
}
}
}
private void newOut(String fileNamePattern, AtomicInteger fileNameCounter, int bufferSize, boolean compress)
throws IOException {
String name = String.format(fileNamePattern, fileNameCounter.getAndIncrement());
OutputStream outputStream = Files.newOutputStream(Paths.get(name), StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
out = new SeparatorOutputStream(compress ? new CompressedOutputStream(outputStream, bufferSize) : outputStream, bufferSize);
}
/**
* A GZIP output stream, modified for best compression.
*/
private static class CompressedOutputStream extends GZIPOutputStream {
CompressedOutputStream(OutputStream out, int size) throws IOException {
super(out, size, true);
def.setLevel(Deflater.BEST_COMPRESSION);
}
}
}