From 7a3ccf94a449bc485a84bf6660914bfd584589c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Mon, 5 Dec 2022 13:30:06 +0100 Subject: [PATCH] add record split write to MarcWriter --- src/main/java/org/xbib/marc/MarcWriter.java | 103 ++++++++++++++++++-- 1 file changed, 95 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/xbib/marc/MarcWriter.java b/src/main/java/org/xbib/marc/MarcWriter.java index 8b24f22..145f0f0 100644 --- a/src/main/java/org/xbib/marc/MarcWriter.java +++ b/src/main/java/org/xbib/marc/MarcWriter.java @@ -26,30 +26,45 @@ import java.io.Closeable; import java.io.Flushable; import java.io.IOException; import java.io.OutputStream; -import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.Deflater; +import java.util.zip.GZIPOutputStream; /** * An ISO 2709 "stream format" MARC writer. */ public class MarcWriter extends MarcContentHandler implements Flushable, Closeable { - private static final int DEFAULT_BUFFER_SIZE = 8192; + private static final int DEFAULT_BUFFER_SIZE = 65536; - private final ReentrantLock lock = new ReentrantLock(true); + private final ReentrantLock lock; private final BytesStreamOutput bytesStreamOutput; - private final SeparatorOutputStream out; - private final Charset charset; + private SeparatorOutputStream out; + private boolean fatalErrors; private Exception exception; + private String fileNamePattern; + + private AtomicInteger fileNameCounter; + + private int splitlimit; + + private int bufferSize; + + private boolean compress; + /** * Create a MarcWriter on an underlying output stream. * @param out the underlying output stream @@ -69,6 +84,19 @@ public class MarcWriter extends MarcContentHandler implements Flushable, Closeab this.out = new SeparatorOutputStream(out, buffersize); this.charset = charset; this.bytesStreamOutput = new BytesStreamOutput(); + this.lock = new ReentrantLock(); + } + + public MarcWriter(String fileNamePattern, Charset charset, int bufferSize, int splitlimit, boolean compress) throws IOException { + this.fileNameCounter = new AtomicInteger(0); + this.fileNamePattern = fileNamePattern; + this.splitlimit = splitlimit; + this.bufferSize = bufferSize; + this.compress = compress; + this.charset = charset; + this.bytesStreamOutput = new BytesStreamOutput(); + this.lock = new ReentrantLock(); + newOut(fileNamePattern, fileNameCounter, bufferSize, compress); } @Override @@ -167,7 +195,8 @@ public class MarcWriter extends MarcContentHandler implements Flushable, Closeab // of the record, following the field terminator of the last data field." // https://www.loc.gov/marc/specifications/specrecstruc.html out.chunk(new DefaultChunk(InformationSeparator.GS, null)); - } catch (IOException e) { + afterRecord(); + } catch (Exception e) { handleException(e); } finally { lock.unlock(); @@ -190,10 +219,68 @@ public class MarcWriter extends MarcContentHandler implements Flushable, Closeab return exception; } - private void handleException(IOException e) { + private void handleException(Exception e) { exception = e; if (fatalErrors) { - throw new UncheckedIOException(e); + throw new IllegalStateException(e); + } + } + + @Override + public void record(MarcRecord marcRecord) { + if (exception != null) { + return; + } + lock.lock(); + try { + super.record(marcRecord); + } catch (Exception e) { + handleException(e); + } finally { + lock.unlock(); + } + } + + /** + * Split records if configured. + */ + private void afterRecord() { + if (fileNamePattern != null) { + if (splitlimit != -1) { + if (getRecordCounter() % splitlimit == 0) { + if (out != null) { + try { + endCollection(); + endDocument(); + out.close(); + newOut(fileNamePattern, fileNameCounter, bufferSize, compress); + startDocument(); + beginCollection(); + } catch (Exception e) { + handleException(e); + } + } + } + } + } + } + + private void newOut(String fileNamePattern, AtomicInteger fileNameCounter, int bufferSize, boolean compress) + throws IOException { + String name = String.format(fileNamePattern, fileNameCounter.getAndIncrement()); + OutputStream outputStream = Files.newOutputStream(Paths.get(name), StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); + out = new SeparatorOutputStream(compress ? new CompressedOutputStream(outputStream, bufferSize) : outputStream, bufferSize); + } + + /** + * A GZIP output stream, modified for best compression. + */ + private static class CompressedOutputStream extends GZIPOutputStream { + + CompressedOutputStream(OutputStream out, int size) throws IOException { + super(out, size, true); + def.setLevel(Deflater.BEST_COMPRESSION); } } }