add Elasticsearch JSON bulk format and gzip compression to MarcJsonWriter

This commit is contained in:
Jörg Prante 2016-09-28 10:58:43 +02:00
parent 09e8bedebe
commit 080333dc16
5 changed files with 218 additions and 43 deletions

12
bin/bulk.sh Normal file
View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
# This example file sends compressed JSON lines formatted files to Elasticsearch bulk endpoint
# It assumes the index settings and the mappings are already created and configured.
# Warning: bulk responses are not evaluated. To send large volumes without errors, more
# precautions have to be considered.
for f in build/*.jsonl.gz; do
curl -XPOST -H "Accept-Encoding: gzip" -H "Content-Encoding: gzip" \
--data-binary @$f --compressed localhost:9200/_bulk
done

View file

@ -24,9 +24,9 @@ import org.xbib.marc.label.RecordLabel;
import org.xbib.marc.transformer.value.MarcValueTransformers;
import org.xbib.marc.xml.MarcContentHandler;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.FileWriter;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
@ -34,6 +34,9 @@ import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -44,6 +47,8 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Deflater;
import java.util.zip.GZIPOutputStream;
/**
* This Marc Writer is a MarcContentHandler that writes Marc events to JSON.
@ -52,7 +57,7 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
private static final Logger logger = Logger.getLogger(MarcJsonWriter.class.getName());
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final int DEFAULT_BUFFER_SIZE = 65536;
public static final String LEADER_TAG = "_LEADER";
@ -64,13 +69,13 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
private final StringBuilder sb;
private BufferedWriter writer;
private Writer writer;
private Marc.Builder builder;
private boolean fatalErrors;
private boolean jsonlines;
private Style style;
private Exception exception;
@ -82,31 +87,37 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
private int bufferSize;
private boolean compress;
private String index;
private String indexType;
/**
* Flag for indicating if writer is at top of file.
*/
private boolean top;
public MarcJsonWriter(OutputStream out) throws IOException {
this(out, false);
this(out, Style.ARRAY);
}
public MarcJsonWriter(OutputStream out, boolean jsonlines) throws IOException {
this(out, DEFAULT_BUFFER_SIZE, jsonlines);
public MarcJsonWriter(OutputStream out, Style style) throws IOException {
this(out, DEFAULT_BUFFER_SIZE, style);
}
public MarcJsonWriter(OutputStream out, int bufferSize, boolean jsonlines) throws IOException {
this(new OutputStreamWriter(out, StandardCharsets.UTF_8), bufferSize, jsonlines);
public MarcJsonWriter(OutputStream out, int bufferSize, Style style) throws IOException {
this(new OutputStreamWriter(out, StandardCharsets.UTF_8), style, bufferSize);
}
public MarcJsonWriter(Writer writer) throws IOException {
this(writer, DEFAULT_BUFFER_SIZE, false);
this(writer, Style.ARRAY, DEFAULT_BUFFER_SIZE);
}
public MarcJsonWriter(Writer writer, int bufferSize, boolean jsonlines) throws IOException {
public MarcJsonWriter(Writer writer, Style style, int bufferSize) throws IOException {
this.writer = new BufferedWriter(writer, bufferSize);
this.bufferSize = bufferSize;
this.jsonlines = jsonlines;
this.style = style;
this.lock = new ReentrantLock();
this.sb = new StringBuilder();
this.builder = Marc.builder();
@ -114,20 +125,32 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
}
public MarcJsonWriter(String fileNamePattern, int splitlimit) throws IOException {
this(fileNamePattern, DEFAULT_BUFFER_SIZE, splitlimit);
this(fileNamePattern, splitlimit, Style.LINES, DEFAULT_BUFFER_SIZE, false);
}
public MarcJsonWriter(String fileNamePattern, int bufferSize, int splitlimit) throws IOException {
public MarcJsonWriter(String fileNamePattern, int splitlimit, Style style) throws IOException {
this(fileNamePattern, splitlimit, style, DEFAULT_BUFFER_SIZE, false);
}
public MarcJsonWriter(String fileNamePattern, int splitlimit, Style style, int bufferSize, boolean compress)
throws IOException {
this.fileNameCounter = new AtomicInteger(0);
this.fileNamePattern = fileNamePattern;
this.splitlimit = splitlimit;
this.writer = newWriter(fileNamePattern, fileNameCounter, bufferSize);
this.bufferSize = bufferSize;
this.lock = new ReentrantLock();
this.sb = new StringBuilder();
this.builder = Marc.builder();
this.top = true;
this.jsonlines = true;
this.style = style;
this.compress = compress;
newWriter(fileNamePattern, fileNameCounter, bufferSize, compress);
}
public MarcJsonWriter setIndex(String index, String indexType) {
this.index = index;
this.indexType = indexType;
return this;
}
public MarcJsonWriter setFatalErrors(boolean fatalErrors) {
@ -167,11 +190,10 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
@Override
public void beginCollection() {
if (jsonlines) {
return;
}
if (style == Style.ARRAY) {
sb.append("[");
}
}
@Override
public void beginRecord(String format, String type) {
@ -230,22 +252,24 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
@Override
public void endCollection() {
if (jsonlines) {
return;
}
if (style == Style.ARRAY) {
sb.append("]");
}
if (style == Style.ELASTICSEARCH_BULK) {
// finish with line-feed
sb.append("\n");
}
try {
writer.write(sb.toString());
flush();
} catch (IOException e) {
handleException(e);
}
sb.setLength(0);
}
@Override
public void endDocument() {
try {
writer.flush();
flush();
} catch (IOException e) {
handleException(e);
}
@ -263,8 +287,24 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
}
if (top) {
top = false;
if (style == Style.ELASTICSEARCH_BULK) {
writeMetaDataLine(marcRecord);
}
} else {
sb.append(jsonlines ? "\n" : ",");
switch (style) {
case ARRAY:
sb.append(",");
break;
case LINES:
sb.append("\n");
break;
case ELASTICSEARCH_BULK:
sb.append("\n");
writeMetaDataLine(marcRecord);
break;
default:
break;
}
}
sb.append("{");
int c0 = 0;
@ -396,6 +436,14 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
@Override
public void flush() throws IOException {
if (sb.length() > 0) {
try {
writer.write(sb.toString());
} catch (IOException e) {
handleException(e);
}
sb.setLength(0);
}
writer.flush();
}
@ -408,7 +456,7 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
try {
endCollection();
close();
writer = newWriter(fileNamePattern, fileNameCounter, bufferSize);
newWriter(fileNamePattern, fileNameCounter, bufferSize, compress);
top = true;
beginCollection();
} catch (IOException e) {
@ -418,10 +466,14 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
}
}
private static BufferedWriter newWriter(String fileNamePattern, AtomicInteger fileNameCounter, int bufferSize)
throws IOException {
String s = String.format(fileNamePattern, fileNameCounter.getAndIncrement());
return new BufferedWriter(new FileWriter(s), bufferSize);
private void newWriter(String fileNamePattern, AtomicInteger fileNameCounter,
int bufferSize, boolean compress) throws IOException {
String name = String.format(fileNamePattern, fileNameCounter.getAndIncrement());
OutputStream out = Files.newOutputStream(Paths.get(name), StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
writer = new OutputStreamWriter(compress ?
new CompressedOutputStream(out, bufferSize) :
new BufferedOutputStream(out, bufferSize), StandardCharsets.UTF_8);
}
private static final Pattern p = Pattern.compile("\"", Pattern.LITERAL);
@ -432,4 +484,42 @@ public class MarcJsonWriter extends MarcContentHandler implements Flushable, Clo
return p.matcher(value).replaceAll(Matcher.quoteReplacement(replacement));
}
private void writeMetaDataLine(MarcRecord marcRecord) {
String id;
Object object = marcRecord.get("001");
// step down to indicator/subfield ID levels if possible, get first value, assuming single field/value in 001
if (object instanceof Map) {
object = ((Map) object).values().iterator().next();
}
if (object instanceof Map) {
object = ((Map) object).values().iterator().next();
}
id = object.toString();
if (index != null && indexType != null && id != null) {
sb.append("{\"index\":{")
.append("\"_index\":\"").append(index).append("\",")
.append("\"_type\":\"").append(indexType).append("\",")
.append("\"_id\":\"").append(id).append("\"}}")
.append("\n");
}
}
/**
*
*/
private static class CompressedOutputStream extends GZIPOutputStream {
CompressedOutputStream(OutputStream out, int size) throws IOException {
super(out, size, true);
def.setLevel(Deflater.BEST_COMPRESSION);
}
}
/**
*
*/
public enum Style {
ARRAY, LINES, ELASTICSEARCH_BULK
}
}

View file

@ -129,7 +129,7 @@ public class ConcurrencyTest {
File file = File.createTempFile(s + ".", ".jsonlines");
file.deleteOnExit();
FileOutputStream out = new FileOutputStream(file);
try (MarcJsonWriter writer = new MarcJsonWriter(out, true)
try (MarcJsonWriter writer = new MarcJsonWriter(out, MarcJsonWriter.Style.LINES)
.setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT)
.setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE)
) {

View file

@ -126,7 +126,7 @@ public class ZDBTest {
OutputStream out = new FileOutputStream(file);
MarcValueTransformers marcValueTransformers = new MarcValueTransformers();
marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC));
try (MarcJsonWriter writer = new MarcJsonWriter(out, true)
try (MarcJsonWriter writer = new MarcJsonWriter(out, MarcJsonWriter.Style.LINES)
.setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT)
.setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE)
.setMarcValueTransformers(marcValueTransformers)) {

View file

@ -18,6 +18,7 @@ package org.xbib.marc.json;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.xbib.helper.StreamMatcher.assertStream;
@ -143,7 +144,7 @@ public class MarcJsonWriterTest {
File file = File.createTempFile(s + ".", ".json");
file.deleteOnExit();
FileOutputStream out = new FileOutputStream(file);
try (MarcJsonWriter writer = new MarcJsonWriter(out, true)
try (MarcJsonWriter writer = new MarcJsonWriter(out, MarcJsonWriter.Style.LINES)
.setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT)
.setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE)
) {
@ -168,7 +169,7 @@ public class MarcJsonWriterTest {
InputStream in = getClass().getResource("/org/xbib/marc//" + s).openStream();
MarcValueTransformers marcValueTransformers = new MarcValueTransformers();
marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC));
MarcJsonWriter writer = new MarcJsonWriter("build/%d.json", 3);
try (MarcJsonWriter writer = new MarcJsonWriter("build/%d.json", 3)) {
writer.setMarcValueTransformers(marcValueTransformers);
Marc.builder()
.setInputStream(in)
@ -177,6 +178,8 @@ public class MarcJsonWriterTest {
.build()
.writeCollection();
assertEquals(10, writer.getRecordCounter());
assertNull(writer.getException());
}
File f0 = new File("build/0.json");
assertTrue(f0.exists() && f0.length() == 6015);
File f1 = new File("build/1.json");
@ -189,4 +192,74 @@ public class MarcJsonWriterTest {
assertFalse(f4.exists());
}
@Test
public void elasticsearchBulkFormat() throws Exception {
String s = "IRMARC8.bin";
InputStream in = getClass().getResource("/org/xbib/marc//" + s).openStream();
MarcValueTransformers marcValueTransformers = new MarcValueTransformers();
marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC));
try (MarcJsonWriter writer = new MarcJsonWriter("build/bulk%d.jsonl", 3, MarcJsonWriter.Style.ELASTICSEARCH_BULK)
.setIndex("testindex", "testtype")) {
writer.setMarcValueTransformers(marcValueTransformers);
Marc.builder()
.setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT)
.setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE)
.setInputStream(in)
.setCharset(Charset.forName("ANSEL"))
.setMarcListener(writer)
.build()
.writeCollection();
assertNull(writer.getException());
assertEquals(10, writer.getRecordCounter());
}
File f0 = new File("build/bulk0.jsonl");
assertTrue(f0.exists() && f0.length() == 6295);
File f1 = new File("build/bulk1.jsonl");
assertTrue(f1.exists() && f1.length() == 7407);
File f2 = new File("build/bulk2.jsonl");
assertTrue(f2.exists() && f2.length() == 6706);
File f3 = new File("build/bulk3.jsonl");
assertTrue(f3.exists() && f3.length() == 2204);
File f4 = new File("build/bulk4.jsonl");
assertFalse(f4.exists());
}
@Test
public void elasticsearchBulkFormatCompressed() throws Exception {
String s = "IRMARC8.bin";
InputStream in = getClass().getResource("/org/xbib/marc//" + s).openStream();
MarcValueTransformers marcValueTransformers = new MarcValueTransformers();
marcValueTransformers.setMarcValueTransformer(value -> Normalizer.normalize(value, Normalizer.Form.NFC));
// split at 3, Elasticsearch bulk format, buffer size 65536, compress = true
try (MarcJsonWriter writer = new MarcJsonWriter("build/bulk%d.jsonl.gz", 3,
MarcJsonWriter.Style.ELASTICSEARCH_BULK, 65536, true)
.setIndex("testindex", "testtype")) {
writer.setMarcValueTransformers(marcValueTransformers);
Marc.builder()
.setFormat(MarcXchangeConstants.MARCXCHANGE_FORMAT)
.setType(MarcXchangeConstants.BIBLIOGRAPHIC_TYPE)
.setInputStream(in)
.setCharset(Charset.forName("ANSEL"))
.setMarcListener(writer)
.build()
.writeCollection();
assertNull(writer.getException());
assertEquals(10, writer.getRecordCounter());
File f0 = new File("build/bulk0.jsonl.gz");
assertTrue(f0.exists());
assertEquals(2141, f0.length());
File f1 = new File("build/bulk1.jsonl.gz");
assertTrue(f1.exists());
assertEquals(2605, f1.length());
File f2 = new File("build/bulk2.jsonl.gz");
assertTrue(f2.exists());
assertEquals(2667, f2.length());
File f3 = new File("build/bulk3.jsonl.gz");
assertTrue(f3.exists());
assertEquals(1021, f3.length()); // but, it's 1031???
File f4 = new File("build/bulk4.jsonl.gz");
assertFalse(f4.exists());
}
}
}