diff --git a/bin/bulk.sh b/bin/bulk.sh new file mode 100644 index 0000000..edfcfb4 --- /dev/null +++ b/bin/bulk.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +# This example file sends compressed JSON lines formatted files to Elasticsearch bulk endpoint +# It assumes the index settings and the mappings are already created and configured. + +# Warning: bulk responses are not evaluated. To send large volumes without errors, more +# precautions have to be considered. + +for f in build/*.jsonl.gz; do + curl -XPOST -H "Accept-Encoding: gzip" -H "Content-Encoding: gzip" \ + --data-binary @$f --compressed localhost:9200/_bulk +done diff --git a/src/main/java/org/xbib/marc/json/MarcJsonWriter.java b/src/main/java/org/xbib/marc/json/MarcJsonWriter.java index 395923c..6df7251 100644 --- a/src/main/java/org/xbib/marc/json/MarcJsonWriter.java +++ b/src/main/java/org/xbib/marc/json/MarcJsonWriter.java @@ -24,9 +24,9 @@ import org.xbib.marc.label.RecordLabel; import org.xbib.marc.transformer.value.MarcValueTransformers; import org.xbib.marc.xml.MarcContentHandler; +import java.io.BufferedOutputStream; import java.io.BufferedWriter; import java.io.Closeable; -import java.io.FileWriter; import java.io.Flushable; import java.io.IOException; import java.io.OutputStream; @@ -34,6 +34,9 @@ import java.io.OutputStreamWriter; import java.io.UncheckedIOException; import java.io.Writer; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.List; import java.util.Map; @@ -44,6 +47,8 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.zip.Deflater; +import java.util.zip.GZIPOutputStream; /** * This Marc Writer is a MarcContentHandler that writes Marc events to JSON. @@ -52,7 +57,7 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo private static final Logger logger = Logger.getLogger(MarcJsonWriter.class.getName()); - private static final int DEFAULT_BUFFER_SIZE = 8192; + private static final int DEFAULT_BUFFER_SIZE = 65536; public static final String LEADER_TAG = "_LEADER"; @@ -64,13 +69,13 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo private final StringBuilder sb; - private BufferedWriter writer; + private Writer writer; private Marc.Builder builder; private boolean fatalErrors; - private boolean jsonlines; + private Style style; private Exception exception; @@ -82,31 +87,37 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo private int bufferSize; + private boolean compress; + + private String index; + + private String indexType; + /** * Flag for indicating if writer is at top of file. */ private boolean top; public MarcJsonWriter(OutputStream out) throws IOException { - this(out, false); + this(out, Style.ARRAY); } - public MarcJsonWriter(OutputStream out, boolean jsonlines) throws IOException { - this(out, DEFAULT_BUFFER_SIZE, jsonlines); + public MarcJsonWriter(OutputStream out, Style style) throws IOException { + this(out, DEFAULT_BUFFER_SIZE, style); } - public MarcJsonWriter(OutputStream out, int bufferSize, boolean jsonlines) throws IOException { - this(new OutputStreamWriter(out, StandardCharsets.UTF_8), bufferSize, jsonlines); + public MarcJsonWriter(OutputStream out, int bufferSize, Style style) throws IOException { + this(new OutputStreamWriter(out, StandardCharsets.UTF_8), style, bufferSize); } public MarcJsonWriter(Writer writer) throws IOException { - this(writer, DEFAULT_BUFFER_SIZE, false); + this(writer, Style.ARRAY, DEFAULT_BUFFER_SIZE); } - public MarcJsonWriter(Writer writer, int bufferSize, boolean jsonlines) throws IOException { + public MarcJsonWriter(Writer writer, Style style, int bufferSize) throws IOException { this.writer = new BufferedWriter(writer, bufferSize); this.bufferSize = bufferSize; - this.jsonlines = jsonlines; + this.style = style; this.lock = new ReentrantLock(); this.sb = new StringBuilder(); this.builder = Marc.builder(); @@ -114,20 +125,32 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo } public MarcJsonWriter(String fileNamePattern, int splitlimit) throws IOException { - this(fileNamePattern, DEFAULT_BUFFER_SIZE, splitlimit); + this(fileNamePattern, splitlimit, Style.LINES, DEFAULT_BUFFER_SIZE, false); } - public MarcJsonWriter(String fileNamePattern, int bufferSize, int splitlimit) throws IOException { + public MarcJsonWriter(String fileNamePattern, int splitlimit, Style style) throws IOException { + this(fileNamePattern, splitlimit, style, DEFAULT_BUFFER_SIZE, false); + } + + public MarcJsonWriter(String fileNamePattern, int splitlimit, Style style, int bufferSize, boolean compress) + throws IOException { this.fileNameCounter = new AtomicInteger(0); this.fileNamePattern = fileNamePattern; this.splitlimit = splitlimit; - this.writer = newWriter(fileNamePattern, fileNameCounter, bufferSize); this.bufferSize = bufferSize; this.lock = new ReentrantLock(); this.sb = new StringBuilder(); this.builder = Marc.builder(); this.top = true; - this.jsonlines = true; + this.style = style; + this.compress = compress; + newWriter(fileNamePattern, fileNameCounter, bufferSize, compress); + } + + public MarcJsonWriter setIndex(String index, String indexType) { + this.index = index; + this.indexType = indexType; + return this; } public MarcJsonWriter setFatalErrors(boolean fatalErrors) { @@ -167,10 +190,9 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo @Override public void beginCollection() { - if (jsonlines) { - return; + if (style == Style.ARRAY) { + sb.append("["); } - sb.append("["); } @Override @@ -230,22 +252,24 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo @Override public void endCollection() { - if (jsonlines) { - return; + if (style == Style.ARRAY) { + sb.append("]"); + } + if (style == Style.ELASTICSEARCH_BULK) { + // finish with line-feed + sb.append("\n"); } - sb.append("]"); try { - writer.write(sb.toString()); + flush(); } catch (IOException e) { handleException(e); } - sb.setLength(0); } @Override public void endDocument() { try { - writer.flush(); + flush(); } catch (IOException e) { handleException(e); } @@ -263,8 +287,24 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo } if (top) { top = false; + if (style == Style.ELASTICSEARCH_BULK) { + writeMetaDataLine(marcRecord); + } } else { - sb.append(jsonlines ? "\n" : ","); + switch (style) { + case ARRAY: + sb.append(","); + break; + case LINES: + sb.append("\n"); + break; + case ELASTICSEARCH_BULK: + sb.append("\n"); + writeMetaDataLine(marcRecord); + break; + default: + break; + } } sb.append("{"); int c0 = 0; @@ -396,6 +436,14 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo @Override public void flush() throws IOException { + if (sb.length() > 0) { + try { + writer.write(sb.toString()); + } catch (IOException e) { + handleException(e); + } + sb.setLength(0); + } writer.flush(); } @@ -408,7 +456,7 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo try { endCollection(); close(); - writer = newWriter(fileNamePattern, fileNameCounter, bufferSize); + newWriter(fileNamePattern, fileNameCounter, bufferSize, compress); top = true; beginCollection(); } catch (IOException e) { @@ -418,10 +466,14 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo } } - private static BufferedWriter newWriter(String fileNamePattern, AtomicInteger fileNameCounter, int bufferSize) - throws IOException { - String s = String.format(fileNamePattern, fileNameCounter.getAndIncrement()); - return new BufferedWriter(new FileWriter(s), bufferSize); + private void newWriter(String fileNamePattern, AtomicInteger fileNameCounter, + int bufferSize, boolean compress) throws IOException { + String name = String.format(fileNamePattern, fileNameCounter.getAndIncrement()); + OutputStream out = Files.newOutputStream(Paths.get(name), StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); + writer = new OutputStreamWriter(compress ? + new CompressedOutputStream(out, bufferSize) : + new BufferedOutputStream(out, bufferSize), StandardCharsets.UTF_8); } private static final Pattern p = Pattern.compile("\"", Pattern.LITERAL); @@ -432,4 +484,42 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo return p.matcher(value).replaceAll(Matcher.quoteReplacement(replacement)); } + private void writeMetaDataLine(MarcRecord marcRecord) { + String id; + Object object = marcRecord.get("001"); + // step down to indicator/subfield ID levels if possible, get first value, assuming single field/value in 001 + if (object instanceof Map) { + object = ((Map) object).values().iterator().next(); + } + if (object instanceof Map) { + object = ((Map) object).values().iterator().next(); + } + id = object.toString(); + if (index != null && indexType != null && id != null) { + sb.append("{\"index\":{") + .append("\"_index\":\"").append(index).append("\",") + .append("\"_type\":\"").append(indexType).append("\",") + .append("\"_id\":\"").append(id).append("\"}}") + .append("\n"); + } + } + + /** + * + */ + private static class CompressedOutputStream extends GZIPOutputStream { + + CompressedOutputStream(OutputStream out, int size) throws IOException { + super(out, size, true); + def.setLevel(Deflater.BEST_COMPRESSION); + } + } + + /** + * + */ + public enum Style { + ARRAY, LINES, ELASTICSEARCH_BULK + } + } diff --git a/src/test/java/org/xbib/marc/ConcurrencyTest.java b/src/test/java/org/xbib/marc/ConcurrencyTest.java index 446e7ff..69d3668 100644 --- a/src/test/java/org/xbib/marc/ConcurrencyTest.java +++ b/src/test/java/org/xbib/marc/ConcurrencyTest.java @@ -129,7 +129,7 @@ public class ConcurrencyTest { File file = File.createTempFile(s + ".", ".jsonlines"); file.deleteOnExit(); FileOutputStream out = new FileOutputStream(file); - try (MarcJsonWriter writer = new MarcJsonWriter(out, true) + try (MarcJsonWriter writer = new MarcJsonWriter(out, MarcJsonWriter.Style.LINES) .setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT) .setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE) ) { diff --git a/src/test/java/org/xbib/marc/ZDBTest.java b/src/test/java/org/xbib/marc/ZDBTest.java index bc43909..f2d114d 100644 --- a/src/test/java/org/xbib/marc/ZDBTest.java +++ b/src/test/java/org/xbib/marc/ZDBTest.java @@ -126,7 +126,7 @@ public class ZDBTest { OutputStream out = new FileOutputStream(file); MarcValueTransformers marcValueTransformers = new MarcValueTransformers(); marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC)); - try (MarcJsonWriter writer = new MarcJsonWriter(out, true) + try (MarcJsonWriter writer = new MarcJsonWriter(out, MarcJsonWriter.Style.LINES) .setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT) .setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE) .setMarcValueTransformers(marcValueTransformers)) { diff --git a/src/test/java/org/xbib/marc/json/MarcJsonWriterTest.java b/src/test/java/org/xbib/marc/json/MarcJsonWriterTest.java index 8f13c42..f6c834f 100644 --- a/src/test/java/org/xbib/marc/json/MarcJsonWriterTest.java +++ b/src/test/java/org/xbib/marc/json/MarcJsonWriterTest.java @@ -18,6 +18,7 @@ package org.xbib.marc.json; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.xbib.helper.StreamMatcher.assertStream; @@ -143,7 +144,7 @@ public class MarcJsonWriterTest { File file = File.createTempFile(s + ".", ".json"); file.deleteOnExit(); FileOutputStream out = new FileOutputStream(file); - try (MarcJsonWriter writer = new MarcJsonWriter(out, true) + try (MarcJsonWriter writer = new MarcJsonWriter(out, MarcJsonWriter.Style.LINES) .setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT) .setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE) ) { @@ -168,15 +169,17 @@ public class MarcJsonWriterTest { InputStream in = getClass().getResource("/org/xbib/marc//" + s).openStream(); MarcValueTransformers marcValueTransformers = new MarcValueTransformers(); marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC)); - MarcJsonWriter writer = new MarcJsonWriter("build/%d.json", 3); - writer.setMarcValueTransformers(marcValueTransformers); - Marc.builder() - .setInputStream(in) - .setCharset(Charset.forName("ANSEL")) - .setMarcListener(writer) - .build() - .writeCollection(); - assertEquals(10, writer.getRecordCounter()); + try (MarcJsonWriter writer = new MarcJsonWriter("build/%d.json", 3)) { + writer.setMarcValueTransformers(marcValueTransformers); + Marc.builder() + .setInputStream(in) + .setCharset(Charset.forName("ANSEL")) + .setMarcListener(writer) + .build() + .writeCollection(); + assertEquals(10, writer.getRecordCounter()); + assertNull(writer.getException()); + } File f0 = new File("build/0.json"); assertTrue(f0.exists() && f0.length() == 6015); File f1 = new File("build/1.json"); @@ -189,4 +192,74 @@ public class MarcJsonWriterTest { assertFalse(f4.exists()); } + @Test + public void elasticsearchBulkFormat() throws Exception { + String s = "IRMARC8.bin"; + InputStream in = getClass().getResource("/org/xbib/marc//" + s).openStream(); + MarcValueTransformers marcValueTransformers = new MarcValueTransformers(); + marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC)); + try (MarcJsonWriter writer = new MarcJsonWriter("build/bulk%d.jsonl", 3, MarcJsonWriter.Style.ELASTICSEARCH_BULK) + .setIndex("testindex", "testtype")) { + writer.setMarcValueTransformers(marcValueTransformers); + Marc.builder() + .setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT) + .setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE) + .setInputStream(in) + .setCharset(Charset.forName("ANSEL")) + .setMarcListener(writer) + .build() + .writeCollection(); + assertNull(writer.getException()); + assertEquals(10, writer.getRecordCounter()); + } + File f0 = new File("build/bulk0.jsonl"); + assertTrue(f0.exists() && f0.length() == 6295); + File f1 = new File("build/bulk1.jsonl"); + assertTrue(f1.exists() && f1.length() == 7407); + File f2 = new File("build/bulk2.jsonl"); + assertTrue(f2.exists() && f2.length() == 6706); + File f3 = new File("build/bulk3.jsonl"); + assertTrue(f3.exists() && f3.length() == 2204); + File f4 = new File("build/bulk4.jsonl"); + assertFalse(f4.exists()); + } + + @Test + public void elasticsearchBulkFormatCompressed() throws Exception { + String s = "IRMARC8.bin"; + InputStream in = getClass().getResource("/org/xbib/marc//" + s).openStream(); + MarcValueTransformers marcValueTransformers = new MarcValueTransformers(); + marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC)); + // split at 3, Elasticsearch bulk format, buffer size 65536, compress = true + try (MarcJsonWriter writer = new MarcJsonWriter("build/bulk%d.jsonl.gz", 3, + MarcJsonWriter.Style.ELASTICSEARCH_BULK, 65536, true) + .setIndex("testindex", "testtype")) { + writer.setMarcValueTransformers(marcValueTransformers); + Marc.builder() + .setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT) + .setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE) + .setInputStream(in) + .setCharset(Charset.forName("ANSEL")) + .setMarcListener(writer) + .build() + .writeCollection(); + assertNull(writer.getException()); + assertEquals(10, writer.getRecordCounter()); + File f0 = new File("build/bulk0.jsonl.gz"); + assertTrue(f0.exists()); + assertEquals(2141, f0.length()); + File f1 = new File("build/bulk1.jsonl.gz"); + assertTrue(f1.exists()); + assertEquals(2605, f1.length()); + File f2 = new File("build/bulk2.jsonl.gz"); + assertTrue(f2.exists()); + assertEquals(2667, f2.length()); + File f3 = new File("build/bulk3.jsonl.gz"); + assertTrue(f3.exists()); + assertEquals(1021, f3.length()); // but, it's 1031??? + File f4 = new File("build/bulk4.jsonl.gz"); + assertFalse(f4.exists()); + } + } + }