diff --git a/NOTICE.txt b/NOTICE.txt index 9e1ddba..b95131c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -58,3 +58,25 @@ https://github.com/jcustenborder/netty-codec-syslog as of 30 October, 2023 License: Apache 2.0 + +---------------- + +The work in org.xbib.event.wal is based upon org.apache.nifi.wali + +https://github.com/apache/nifi/tree/main/nifi-commons/nifi-write-ahead-log + +as of 4 January, 2024 + +License: Apache 2.0 + +----------------- + +The work in org.xbib.event.log is based upon com.vlkan.rfos by Volkan Yazıcı + +https://github.com/vy/rotating-fos + +as of June, 2021 + +License: Apache 2.0 + +----------------- diff --git a/gradle.properties b/gradle.properties index c4c21f6..e251cbe 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ group = org.xbib name = event -version = 0.0.9 +version = 0.0.10 diff --git a/settings.gradle b/settings.gradle index 4bf3fcf..410c303 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,7 +18,7 @@ dependencyResolutionManagement { version('gradle', '8.5') version('datastructures', '5.0.6') version('netty', '4.1.104.Final') - version('net', '4.0.3') + version('net', '4.0.4') library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('net', 'org.xbib', 'net').versionRef('net') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 1a2f5ff..30e3b92 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -6,6 +6,7 @@ module org.xbib.event { exports org.xbib.event.io; exports org.xbib.event.io.file; exports org.xbib.event.io.path; + exports org.xbib.event.log; exports org.xbib.event.loop.selector; exports org.xbib.event.loop; exports org.xbib.event.persistence; @@ -13,6 +14,7 @@ module org.xbib.event { exports org.xbib.event.syslog; exports org.xbib.event.timer; exports org.xbib.event.util; + exports org.xbib.event.wal; exports org.xbib.event.yield; exports org.xbib.event; requires io.netty.buffer; diff --git a/src/main/java/org/xbib/event/log/ByteCountingOutputStream.java b/src/main/java/org/xbib/event/log/ByteCountingOutputStream.java new file mode 100644 index 0000000..b719359 --- /dev/null +++ b/src/main/java/org/xbib/event/log/ByteCountingOutputStream.java @@ -0,0 +1,48 @@ +package org.xbib.event.log; + +import java.io.IOException; +import java.io.OutputStream; + +public class ByteCountingOutputStream extends OutputStream { + + private final OutputStream outputStream; + + private long size; + + public ByteCountingOutputStream(OutputStream outputStream, long size) { + this.outputStream = outputStream; + this.size = size; + } + + public long size() { + return size; + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + size++; + } + + @Override + public void write(byte[] b) throws IOException { + outputStream.write(b); + size += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputStream.write(b, off, len); + size += len; + } + + @Override + public void flush() throws IOException { + outputStream.flush(); + } + + @Override + public void close() throws IOException { + outputStream.close(); + } +} diff --git a/src/main/java/org/xbib/event/log/CharCountingWriter.java b/src/main/java/org/xbib/event/log/CharCountingWriter.java new file mode 100644 index 0000000..4fe1b71 --- /dev/null +++ b/src/main/java/org/xbib/event/log/CharCountingWriter.java @@ -0,0 +1,42 @@ +package org.xbib.event.log; + +import java.io.IOException; +import java.io.Writer; + +public class CharCountingWriter extends Writer { + + private final Writer writer; + + private long size; + + public CharCountingWriter(Writer writer, long size) { + this.writer = writer; + this.size = size; + } + + public long size() { + return size; + } + + @Override + public void write(int b) throws IOException { + writer.write(b); + size++; + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + writer.write(cbuf, off, len); + size += len; + } + + @Override + public void flush() throws IOException { + writer.flush(); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} diff --git a/src/main/java/org/xbib/event/log/Clock.java b/src/main/java/org/xbib/event/log/Clock.java new file mode 100644 index 0000000..35e2ae6 --- /dev/null +++ b/src/main/java/org/xbib/event/log/Clock.java @@ -0,0 +1,13 @@ +package org.xbib.event.log; + +import java.time.Instant; + +public interface Clock { + + Instant now(); + + Instant midnight(); + + Instant sundayMidnight(); + +} diff --git a/src/main/java/org/xbib/event/log/DailyRotationPolicy.java b/src/main/java/org/xbib/event/log/DailyRotationPolicy.java new file mode 100644 index 0000000..09d1f86 --- /dev/null +++ b/src/main/java/org/xbib/event/log/DailyRotationPolicy.java @@ -0,0 +1,19 @@ +package org.xbib.event.log; + +import java.time.Instant; + +public class DailyRotationPolicy extends TimeBasedRotationPolicy { + + public DailyRotationPolicy() { + } + + @Override + protected Instant getTriggerInstant(Clock clock) { + return clock.midnight(); + } + + @Override + public String toString() { + return "DailyRotationPolicy"; + } +} diff --git a/src/main/java/org/xbib/event/log/LoggingRotationListener.java b/src/main/java/org/xbib/event/log/LoggingRotationListener.java new file mode 100644 index 0000000..c70e96f --- /dev/null +++ b/src/main/java/org/xbib/event/log/LoggingRotationListener.java @@ -0,0 +1,47 @@ +package org.xbib.event.log; + +import java.nio.file.Path; +import java.text.MessageFormat; +import java.time.Instant; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class LoggingRotationListener implements RotationListener { + + private static final LoggingRotationListener INSTANCE = new LoggingRotationListener(); + + private static final Logger logger = Logger.getLogger(LoggingRotationListener.class.getName()); + + private LoggingRotationListener() { + } + + public static LoggingRotationListener getInstance() { + return INSTANCE; + } + + @Override + public void onTrigger(RotationPolicy policy, Instant instant) { + logger.log(Level.FINE, MessageFormat.format("rotation trigger policy={0}, instant={1}", policy, instant)); + } + + @Override + public void onOpen(RotationPolicy policy, Instant instant) { + logger.log(Level.FINE, MessageFormat.format("open policy={0}, instant={1}", policy, instant)); + } + + @Override + public void onClose(RotationPolicy policy, Instant instant) { + logger.log(Level.FINE, MessageFormat.format("close policy={0}, instant={1}", policy, instant)); + } + + @Override + public void onSuccess(RotationPolicy policy, Instant instant, Path path) { + logger.log(Level.FINE, MessageFormat.format("rotation success policy={0}, instant={1}, path={2}", policy, instant, path)); + } + + @Override + public void onFailure(RotationPolicy policy, Instant instant, Path path, Throwable throwable) { + String message = MessageFormat.format("rotation failure policy={0}, instant={1}, file={2}", policy, instant, path); + logger.log(Level.SEVERE, message, throwable); + } +} diff --git a/src/main/java/org/xbib/event/log/Rotatable.java b/src/main/java/org/xbib/event/log/Rotatable.java new file mode 100644 index 0000000..6edf5af --- /dev/null +++ b/src/main/java/org/xbib/event/log/Rotatable.java @@ -0,0 +1,15 @@ +package org.xbib.event.log; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; + +public interface Rotatable { + + RotationConfig getConfig(); + + void rotate(RotationPolicy policy, Instant instant) throws IOException; + + void onException(RotationPolicy policy, Instant instant, Path path, Throwable throwable); + +} diff --git a/src/main/java/org/xbib/event/log/RotatingFileOutputStream.java b/src/main/java/org/xbib/event/log/RotatingFileOutputStream.java new file mode 100644 index 0000000..eb7ea49 --- /dev/null +++ b/src/main/java/org/xbib/event/log/RotatingFileOutputStream.java @@ -0,0 +1,170 @@ +package org.xbib.event.log; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.text.MessageFormat; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; + +public class RotatingFileOutputStream extends OutputStream implements Rotatable { + + private static final Logger logger = Logger.getLogger(RotatingFileOutputStream.class.getName()); + + private final RotationConfig config; + + private final List writeSensitivePolicies; + + private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + private volatile ByteCountingOutputStream byteCountingOutputStream; + + public RotatingFileOutputStream(RotationConfig config) throws IOException { + this(config, createDefaultExecutorService()); + } + + public RotatingFileOutputStream(RotationConfig config, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) throws IOException { + this.config = config; + this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor; + this.writeSensitivePolicies = config.getPolicies().stream().filter(RotationPolicy::isWriteSensitive).collect(Collectors.toList()); + this.byteCountingOutputStream = open(null, config.getClock().now()); + startPolicies(); + } + + @Override + public void rotate(RotationPolicy policy, Instant instant) throws IOException { + Objects.requireNonNull(instant, "instant"); + RotationListener listener = config.getListener(); + listener.onTrigger(policy, instant); + byteCountingOutputStream.flush(); + if (Files.size(config.getPath()) == 0) { + logger.log(Level.FINE, MessageFormat.format("empty file, skipping rotation {path={}}", config.getPath())); + return; + } + listener.onClose(policy, instant); + byteCountingOutputStream.close(); + Path rotated = config.getFilePattern().create(instant); + logger.log(Level.FINE, MessageFormat.format("renaming {file={0}, rotatedFile={1}}", config.getPath(), rotated)); + Files.move(config.getPath(), rotated); + logger.log(Level.FINE, MessageFormat.format("re-opening file {path={0}}", config.getPath())); + byteCountingOutputStream = open(policy, instant); + if (config.isCompress()) { + compress(policy, instant, rotated, listener); + return; + } + listener.onSuccess(policy, instant, rotated); + } + + @Override + public void onException(RotationPolicy policy, Instant instant, Path path, Throwable throwable) { + config.getListener().onFailure(policy, instant, path, throwable); + } + + private void compress(RotationPolicy policy, Instant instant, Path rotated, RotationListener listener) { + scheduledThreadPoolExecutor.execute(() -> { + Path compressed = Paths.get(rotated + ".gz"); + logger.log(Level.FINE, MessageFormat.format("compressing {rotated={}, compressed={}}", rotated, compressed)); + try (InputStream inputStream = Files.newInputStream(rotated); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(Files.newOutputStream(compressed))) { + inputStream.transferTo(gzipOutputStream); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + listener.onFailure(policy, instant, rotated, e); + } + logger.log(Level.FINE, MessageFormat.format("deleting old file {rotated={}}", rotated)); + try { + Files.delete(rotated); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + listener.onFailure(policy, instant, rotated, e); + } + listener.onSuccess(policy, instant, compressed); + }); + } + + @Override + public RotationConfig getConfig() { + return config; + } + + @Override + public synchronized void write(int b) throws IOException { + long byteCount = byteCountingOutputStream.size() + 1; + notifyWriteSensitivePolicies(byteCount); + byteCountingOutputStream.write(b); + } + + @Override + public synchronized void write(byte[] b) throws IOException { + long byteCount = byteCountingOutputStream.size() + b.length; + notifyWriteSensitivePolicies(byteCount); + byteCountingOutputStream.write(b); + } + + @Override + public synchronized void write(byte[] b, int off, int len) throws IOException { + long byteCount = byteCountingOutputStream.size() + len; + notifyWriteSensitivePolicies(byteCount); + byteCountingOutputStream.write(b, off, len); + } + + @Override + public synchronized void flush() throws IOException { + byteCountingOutputStream.flush(); + } + + @Override + public synchronized void close() throws IOException { + config.getListener().onClose(null, config.getClock().now()); + if (byteCountingOutputStream != null) { + byteCountingOutputStream.close(); + byteCountingOutputStream = null; + } + } + + private static ScheduledThreadPoolExecutor createDefaultExecutorService() { + final AtomicInteger threadCount = new AtomicInteger(); + return new ScheduledThreadPoolExecutor(4, runnable -> { + Thread thread = new Thread(runnable, "file-rotation-" + threadCount.getAndIncrement()); + thread.setDaemon(true); + return thread; + }); + } + + private void startPolicies() { + for (RotationPolicy policy : config.getPolicies()) { + policy.start(this); + } + } + + private ByteCountingOutputStream open(RotationPolicy policy, Instant instant) throws IOException { + OutputStream fileOutputStream = config.isAppend() && Files.exists(config.getPath()) ? + Files.newOutputStream(config.getPath(), StandardOpenOption.APPEND) : + Files.newOutputStream(config.getPath()); + config.getListener().onOpen(policy, instant); + long size = config.isAppend() ? Files.size(config.getPath()) : 0; + return new ByteCountingOutputStream(fileOutputStream, size); + } + + private void notifyWriteSensitivePolicies(long byteCount) { + for (RotationPolicy writeSensitivePolicy : writeSensitivePolicies) { + writeSensitivePolicy.acceptWrite(byteCount); + } + } + + @Override + public String toString() { + return "RotatingFileOutputStream " + config.getPath(); + } +} diff --git a/src/main/java/org/xbib/event/log/RotatingFilePattern.java b/src/main/java/org/xbib/event/log/RotatingFilePattern.java new file mode 100644 index 0000000..b028950 --- /dev/null +++ b/src/main/java/org/xbib/event/log/RotatingFilePattern.java @@ -0,0 +1,252 @@ +package org.xbib.event.log; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.TimeZone; + +public class RotatingFilePattern { + + private static final char ESCAPE_CHAR = '%'; + + private static final char DATE_TIME_DIRECTIVE_CHAR = 'd'; + + private static final char DATE_TIME_BLOCK_START_CHAR = '{'; + + private static final char DATE_TIME_BLOCK_END_CHAR = '}'; + + private interface Field { + + void render(StringBuilder builder, Instant instant); + + } + + private static class TextField implements Field { + + private final String text; + + private TextField(String text) { + this.text = text; + } + + @Override + public void render(StringBuilder builder, Instant ignored) { + builder.append(text); + } + + } + + private static class DateTimeField implements Field { + + private final DateTimeFormatter dateTimeFormatter; + + private DateTimeField(DateTimeFormatter dateTimeFormatter) { + this.dateTimeFormatter = dateTimeFormatter; + } + + @Override + public void render(StringBuilder builder, Instant instant) { + String formattedDateTime = dateTimeFormatter.format(instant); + builder.append(formattedDateTime); + } + + } + + private final String pattern; + + private final Locale locale; + + private final ZoneId timeZoneId; + + private final List fields; + + private RotatingFilePattern(Builder builder) { + this.pattern = builder.pattern; + this.locale = builder.locale; + this.timeZoneId = builder.timeZoneId; + this.fields = readPattern(pattern, locale, timeZoneId); + } + + private static List readPattern(String pattern, Locale locale, ZoneId timeZoneId) { + + List fields = new LinkedList<>(); + StringBuilder textBuilder = new StringBuilder(); + int totalCharCount = pattern.length(); + boolean foundDateTimeDirective = false; + for (int charIndex = 0; charIndex < totalCharCount;) { + + char c0 = pattern.charAt(charIndex); + if (c0 == ESCAPE_CHAR) { + + // Check if escape character is escaped. + boolean hasOneMoreChar = (totalCharCount - charIndex - 1) > 0; + if (hasOneMoreChar) { + char c1 = pattern.charAt(charIndex + 1); + if (c1 == ESCAPE_CHAR) { + textBuilder.append(c1); + charIndex += 2; + continue; + } + } + + // Append collected text so far, if there is any. + if (textBuilder.length() > 0) { + String text = textBuilder.toString(); + TextField field = new TextField(text); + fields.add(field); + textBuilder = new StringBuilder(); + } + + // Try to read the directive. + boolean hasSufficientDateTimeChars = (totalCharCount - charIndex - 3) > 0; + if (hasSufficientDateTimeChars) { + char c1 = pattern.charAt(charIndex + 1); + if (c1 == DATE_TIME_DIRECTIVE_CHAR) { + int blockStartIndex = charIndex + 2; + char c2 = pattern.charAt(blockStartIndex); + if (c2 == DATE_TIME_BLOCK_START_CHAR) { + int blockEndIndex = pattern.indexOf(DATE_TIME_BLOCK_END_CHAR, blockStartIndex + 1); + if (blockEndIndex >= 0) { + String dateTimePattern = pattern.substring(blockStartIndex + 1, blockEndIndex); + DateTimeFormatter dateTimeFormatter; + try { + dateTimeFormatter = DateTimeFormatter + .ofPattern(dateTimePattern) + .withLocale(locale) + .withZone(timeZoneId); + } catch (Exception error) { + String message = String.format( + "invalid date time pattern (position=%d, pattern=%s, dateTimePattern=%s)", + charIndex, pattern, dateTimePattern); + throw new RotatingFilePatternException(message, error); + } + DateTimeField dateTimeField = new DateTimeField(dateTimeFormatter); + fields.add(dateTimeField); + foundDateTimeDirective = true; + charIndex = blockEndIndex + 1; + continue; + } + } + } + + } + + // Escape character leads to a dead end. + String message = String.format("invalid escape character (position=%d, pattern=%s)", charIndex, pattern); + throw new RotatingFilePatternException(message); + + } else { + textBuilder.append(c0); + charIndex += 1; + } + + } + + // Append collected text so far, if there is any. + if (textBuilder.length() > 0) { + String text = textBuilder.toString(); + TextField field = new TextField(text); + fields.add(field); + } + + // Bail out if could not locate any date time directives. + if (!foundDateTimeDirective) { + String message = String.format("missing date time directive (pattern=%s)", pattern); + throw new RotatingFilePatternException(message); + } + + // Return collected fields so far. + return fields; + + } + + public Path create(Instant instant) { + StringBuilder pathNameBuilder = new StringBuilder(); + for (Field field : fields) { + field.render(pathNameBuilder, instant); + } + String pathName = pathNameBuilder.toString(); + return Paths.get(pathName); + } + + public String getPattern() { + return pattern; + } + + public Locale getLocale() { + return locale; + } + + public ZoneId getTimeZoneId() { + return timeZoneId; + } + + @Override + public boolean equals(Object instance) { + if (this == instance) return true; + if (instance == null || getClass() != instance.getClass()) return false; + RotatingFilePattern that = (RotatingFilePattern) instance; + return Objects.equals(pattern, that.pattern) && + Objects.equals(locale, that.locale) && + Objects.equals(timeZoneId, that.timeZoneId); + } + + @Override + public int hashCode() { + return Objects.hash(pattern, locale, timeZoneId); + } + + @Override + public String toString() { + return String.format("RotatingFilePattern{pattern=%s, locale=%s, timeZoneId=%s}", pattern, locale, timeZoneId); + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private String pattern; + + private Locale locale = Locale.getDefault(); + + private ZoneId timeZoneId = TimeZone.getDefault().toZoneId(); + + private Builder() {} + + public Builder pattern(String pattern) { + this.pattern = pattern; + return this; + } + + public Builder locale(Locale locale) { + this.locale = locale; + return this; + } + + public Builder timeZoneId(ZoneId timeZoneId) { + this.timeZoneId = timeZoneId; + return this; + } + + public RotatingFilePattern build() { + validate(); + return new RotatingFilePattern(this); + } + + private void validate() { + Objects.requireNonNull(pattern, "file"); + Objects.requireNonNull(locale, "locale"); + Objects.requireNonNull(timeZoneId, "timeZoneId"); + } + + } + +} diff --git a/src/main/java/org/xbib/event/log/RotatingFilePatternException.java b/src/main/java/org/xbib/event/log/RotatingFilePatternException.java new file mode 100644 index 0000000..844a450 --- /dev/null +++ b/src/main/java/org/xbib/event/log/RotatingFilePatternException.java @@ -0,0 +1,14 @@ +package org.xbib.event.log; + +@SuppressWarnings("serial") +public class RotatingFilePatternException extends IllegalArgumentException { + + public RotatingFilePatternException(String message) { + super(message); + } + + public RotatingFilePatternException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/org/xbib/event/log/RotatingFileWriter.java b/src/main/java/org/xbib/event/log/RotatingFileWriter.java new file mode 100644 index 0000000..acf6a49 --- /dev/null +++ b/src/main/java/org/xbib/event/log/RotatingFileWriter.java @@ -0,0 +1,149 @@ +package org.xbib.event.log; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.text.MessageFormat; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; + +public class RotatingFileWriter extends Writer implements Rotatable { + + private static final Logger logger = Logger.getLogger(RotatingFileWriter.class.getName()); + + private final RotationConfig config; + + private final List writeSensitivePolicies; + + private volatile CharCountingWriter charCountingWriter; + + public RotatingFileWriter(RotationConfig config) throws IOException { + this.config = config; + Objects.requireNonNull(config.getScheduler()); + this.writeSensitivePolicies = config.getPolicies().stream().filter(RotationPolicy::isWriteSensitive).collect(Collectors.toList()); + this.charCountingWriter = open(null, config.getClock().now()); + startPolicies(); + } + + @Override + public void rotate(RotationPolicy policy, Instant instant) throws IOException { + Objects.requireNonNull(instant, "instant"); + RotationListener listener = config.getListener(); + listener.onTrigger(policy, instant); + charCountingWriter.flush(); + if (Files.size(config.getPath()) == 0) { + logger.log(Level.FINE, MessageFormat.format("empty file, skipping rotation {path={}}", config.getPath())); + return; + } + listener.onClose(policy, instant); + charCountingWriter.close(); + Path rotated = config.getFilePattern().create(instant); + logger.log(Level.FINE, MessageFormat.format("renaming {file={0}, rotated={1}}", config.getPath(), rotated)); + Files.move(config.getPath(), rotated); + logger.log(Level.FINE, MessageFormat.format("re-opening {path={0}}", config.getPath())); + charCountingWriter = open(policy, instant); + if (config.isCompress()) { + compress(policy, instant, rotated, listener); + return; + } + listener.onSuccess(policy, instant, rotated); + } + + @Override + public void onException(RotationPolicy policy, Instant instant, Path path, Throwable throwable) { + config.getListener().onFailure(policy, instant, path, throwable); + } + + private void compress(RotationPolicy policy, Instant instant, Path rotated, RotationListener listener) { + config.getScheduler().execute(() -> { + Path compressed = Paths.get(rotated + ".gz"); + logger.log(Level.FINE, MessageFormat.format("compressing {rotated={}, compressed={}}", rotated, compressed)); + try (InputStream inputStream = Files.newInputStream(rotated); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(Files.newOutputStream(compressed))) { + inputStream.transferTo(gzipOutputStream); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + listener.onFailure(policy, instant, compressed, e); + return; + } + logger.log(Level.FINE, MessageFormat.format("deleting old file {rotated={}}", rotated)); + try { + Files.delete(rotated); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + listener.onFailure(policy, instant, compressed, e); + return; + } + listener.onSuccess(policy, instant, compressed); + }); + } + + @Override + public RotationConfig getConfig() { + return config; + } + + @Override + public synchronized void write(int b) throws IOException { + long byteCount = charCountingWriter.size() + 1; + notifyWriteSensitivePolicies(byteCount); + charCountingWriter.write(b); + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + long byteCount = charCountingWriter.size() + len; + notifyWriteSensitivePolicies(byteCount); + charCountingWriter.write(cbuf, off, len); + } + + @Override + public synchronized void flush() throws IOException { + charCountingWriter.flush(); + } + + @Override + public synchronized void close() throws IOException { + config.getListener().onClose(null, config.getClock().now()); + if (charCountingWriter != null) { + charCountingWriter.close(); + charCountingWriter = null; + } + } + + private void startPolicies() { + for (RotationPolicy policy : config.getPolicies()) { + policy.start(this); + } + } + + private CharCountingWriter open(RotationPolicy policy, Instant instant) throws IOException { + Writer writer = config.isAppend() && Files.exists(config.getPath())? + Files.newBufferedWriter(config.getPath(), StandardOpenOption.APPEND) : + Files.newBufferedWriter(config.getPath()); + config.getListener().onOpen(policy, instant); + long size = config.isAppend() ? Files.size(config.getPath()) : 0; + return new CharCountingWriter(writer, size); + } + + private void notifyWriteSensitivePolicies(long byteCount) { + for (RotationPolicy writeSensitivePolicy : writeSensitivePolicies) { + writeSensitivePolicy.acceptWrite(byteCount); + } + } + + @Override + public String toString() { + return "RotatingFileOutputStream " + config.getPath(); + } + +} diff --git a/src/main/java/org/xbib/event/log/RotationConfig.java b/src/main/java/org/xbib/event/log/RotationConfig.java new file mode 100644 index 0000000..d9950f2 --- /dev/null +++ b/src/main/java/org/xbib/event/log/RotationConfig.java @@ -0,0 +1,197 @@ +package org.xbib.event.log; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; + +public class RotationConfig { + + private final Path path; + + private final RotatingFilePattern filePattern; + + private final ScheduledExecutorService executorService; + + private final Collection policies; + + private final boolean append; + + private final boolean compress; + + private final Clock clock; + + private final RotationListener rotationListener; + + private RotationConfig(Builder builder) { + this.path = builder.path; + this.filePattern = builder.filePattern; + this.executorService = builder.executorService; + this.policies = builder.policies; + this.append = builder.append; + this.compress = builder.compress; + this.clock = builder.clock; + this.rotationListener = builder.listener; + } + + public Path getPath() { + return path; + } + + public RotatingFilePattern getFilePattern() { + return filePattern; + } + + public Collection getPolicies() { + return policies; + } + + public boolean isAppend() { + return append; + } + + public boolean isCompress() { + return compress; + } + + public Clock getClock() { + return clock; + } + + public RotationListener getListener() { + return rotationListener; + } + + public ScheduledExecutorService getScheduler() { + return executorService; + } + + @Override + public boolean equals(Object instance) { + if (this == instance) return true; + if (instance == null || getClass() != instance.getClass()) return false; + RotationConfig that = (RotationConfig) instance; + return append == that.append && + compress == that.compress && + Objects.equals(path, that.path) && + Objects.equals(filePattern, that.filePattern) && + Objects.equals(executorService, that.executorService) && + Objects.equals(policies, that.policies) && + Objects.equals(clock, that.clock) && + Objects.equals(rotationListener, that.rotationListener); + } + + @Override + public int hashCode() { + return Objects.hash(path, filePattern, executorService, policies, append, compress, clock, rotationListener); + } + + @Override + public String toString() { + return String.format("RotationConfig{path=%s}", path); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private Path path; + + private RotatingFilePattern filePattern; + + private ScheduledExecutorService executorService; + + private Collection policies; + + private boolean append; + + private boolean compress; + + private Clock clock; + + private RotationListener listener; + + private Builder() { + this.append = true; + this.clock = SystemClock.getInstance(); + this.listener = LoggingRotationListener.getInstance(); + } + + public Builder path(Path path) { + this.path = path; + return this; + } + + public Builder path(String fileName) { + this.path = Paths.get(fileName); + return this; + } + + public Builder filePattern(RotatingFilePattern filePattern) { + this.filePattern = filePattern; + return this; + } + + public Builder filePattern(String filePattern) { + this.filePattern = RotatingFilePattern.builder().pattern(filePattern).build(); + return this; + } + + public Builder executorService(ScheduledExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public Builder policies(Collection policies) { + this.policies = policies; + return this; + } + + public Builder policy(RotationPolicy policy) { + if (policies == null) { + policies = new LinkedHashSet<>(); + } + policies.add(policy); + return this; + } + + public Builder append(boolean append) { + this.append = append; + return this; + } + + public Builder compress(boolean compress) { + this.compress = compress; + return this; + } + + public Builder clock(Clock clock) { + this.clock = clock; + return this; + } + + public Builder listener(RotationListener listener) { + this.listener = listener; + return this; + } + + public RotationConfig build() { + validate(); + return new RotationConfig(this); + } + + private void validate() { + Objects.requireNonNull(path, "path"); + Objects.requireNonNull(filePattern, "filePattern"); + if (policies == null || policies.isEmpty()) { + throw new IllegalArgumentException("empty policies"); + } + Objects.requireNonNull(clock, "clock"); + Objects.requireNonNull(listener, "callback"); + } + } +} diff --git a/src/main/java/org/xbib/event/log/RotationListener.java b/src/main/java/org/xbib/event/log/RotationListener.java new file mode 100644 index 0000000..9079e16 --- /dev/null +++ b/src/main/java/org/xbib/event/log/RotationListener.java @@ -0,0 +1,18 @@ +package org.xbib.event.log; + +import java.nio.file.Path; +import java.time.Instant; + +public interface RotationListener { + + void onTrigger(RotationPolicy policy, Instant instant); + + void onOpen(RotationPolicy policy, Instant instant); + + void onClose(RotationPolicy policy, Instant instant); + + void onSuccess(RotationPolicy policy, Instant instant, Path path); + + void onFailure(RotationPolicy policy, Instant instant, Path path, Throwable throwable); + +} diff --git a/src/main/java/org/xbib/event/log/RotationPolicy.java b/src/main/java/org/xbib/event/log/RotationPolicy.java new file mode 100644 index 0000000..48ae92c --- /dev/null +++ b/src/main/java/org/xbib/event/log/RotationPolicy.java @@ -0,0 +1,10 @@ +package org.xbib.event.log; + +public interface RotationPolicy { + + void start(Rotatable rotatable); + + boolean isWriteSensitive(); + + void acceptWrite(long byteCount); +} diff --git a/src/main/java/org/xbib/event/log/SizeBasedRotationPolicy.java b/src/main/java/org/xbib/event/log/SizeBasedRotationPolicy.java new file mode 100644 index 0000000..b21c571 --- /dev/null +++ b/src/main/java/org/xbib/event/log/SizeBasedRotationPolicy.java @@ -0,0 +1,69 @@ +package org.xbib.event.log; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class SizeBasedRotationPolicy implements RotationPolicy { + + private static final Logger logger = Logger.getLogger(SizeBasedRotationPolicy.class.getName()); + + private final long maxByteCount; + + private Rotatable rotatable; + + public SizeBasedRotationPolicy(long maxByteCount) { + if (maxByteCount < 1) { + String message = String.format("invalid size {maxByteCount=%d}", maxByteCount); + throw new IllegalArgumentException(message); + } + this.maxByteCount = maxByteCount; + } + + public long getMaxByteCount() { + return maxByteCount; + } + + @Override + public boolean isWriteSensitive() { + return true; + } + + @Override + public void acceptWrite(long byteCount) { + if (byteCount > maxByteCount) { + Instant instant = rotatable.getConfig().getClock().now(); + try { + rotatable.rotate(this, instant); + } catch (IOException e) { + logger.log(Level.SEVERE, e.getMessage(), e); + rotatable.onException(this, instant, rotatable.getConfig().getPath(), e); + } + } + } + + @Override + public void start(Rotatable rotatable) { + this.rotatable = rotatable; + } + + @Override + public boolean equals(Object instance) { + if (this == instance) return true; + if (instance == null || getClass() != instance.getClass()) return false; + SizeBasedRotationPolicy that = (SizeBasedRotationPolicy) instance; + return maxByteCount == that.maxByteCount; + } + + @Override + public int hashCode() { + return Objects.hash(maxByteCount); + } + + @Override + public String toString() { + return String.format("SizeBasedRotationPolicy{maxByteCount=%d}", maxByteCount); + } +} diff --git a/src/main/java/org/xbib/event/log/SystemClock.java b/src/main/java/org/xbib/event/log/SystemClock.java new file mode 100644 index 0000000..6a576af --- /dev/null +++ b/src/main/java/org/xbib/event/log/SystemClock.java @@ -0,0 +1,48 @@ +package org.xbib.event.log; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; + +public class SystemClock implements Clock { + + private static final SystemClock INSTANCE = new SystemClock(); + + public SystemClock() { + } + + public static SystemClock getInstance() { + return INSTANCE; + } + + @Override + public Instant now() { + return Instant.now(); + } + + @Override + public Instant midnight() { + Instant instant = now(); + ZonedDateTime utcInstant = instant.atZone(ZoneId.of("UTC")); + return LocalDate + .from(utcInstant) + .atStartOfDay() + .plusDays(1) + .toInstant(ZoneOffset.UTC); + } + + @Override + public Instant sundayMidnight() { + Instant instant = now(); + ZonedDateTime utcInstant = instant.atZone(ZoneId.of("UTC")); + LocalDateTime todayStart = LocalDate.from(utcInstant).atStartOfDay(); + int todayIndex = todayStart.getDayOfWeek().getValue() - 1; + int sundayOffset = 7 - todayIndex; + return todayStart + .plusDays(sundayOffset) + .toInstant(ZoneOffset.UTC); + } +} diff --git a/src/main/java/org/xbib/event/log/TimeBasedRotationPolicy.java b/src/main/java/org/xbib/event/log/TimeBasedRotationPolicy.java new file mode 100644 index 0000000..03568c0 --- /dev/null +++ b/src/main/java/org/xbib/event/log/TimeBasedRotationPolicy.java @@ -0,0 +1,46 @@ +package org.xbib.event.log; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +public abstract class TimeBasedRotationPolicy implements RotationPolicy { + + private static final Logger logger = Logger.getLogger(TimeBasedRotationPolicy.class.getName()); + + public TimeBasedRotationPolicy() { + } + + @Override + public boolean isWriteSensitive() { + return false; + } + + @Override + public void acceptWrite(long byteCount) { + throw new UnsupportedOperationException(); + } + + @Override + public void start(Rotatable rotatable) { + RotationConfig config = rotatable.getConfig(); + Clock clock = config.getClock(); + Instant currentInstant = clock.now(); + Instant triggerInstant = getTriggerInstant(clock); + long triggerDelayMillis = Duration.between(currentInstant, triggerInstant).toMillis(); + Runnable task = () -> { + try { + rotatable.rotate(TimeBasedRotationPolicy.this, triggerInstant); + start(rotatable); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + rotatable.onException(this, currentInstant, rotatable.getConfig().getPath(), e); + } + }; + config.getScheduler().schedule(task, triggerDelayMillis, TimeUnit.MILLISECONDS); + } + + protected abstract Instant getTriggerInstant(Clock clock); +} diff --git a/src/main/java/org/xbib/event/log/WeeklyRotationPolicy.java b/src/main/java/org/xbib/event/log/WeeklyRotationPolicy.java new file mode 100644 index 0000000..8f5b201 --- /dev/null +++ b/src/main/java/org/xbib/event/log/WeeklyRotationPolicy.java @@ -0,0 +1,19 @@ +package org.xbib.event.log; + +import java.time.Instant; + +public class WeeklyRotationPolicy extends TimeBasedRotationPolicy { + + public WeeklyRotationPolicy() { + } + + @Override + public Instant getTriggerInstant(Clock clock) { + return clock.sundayMidnight(); + } + + @Override + public String toString() { + return "WeeklyRotationPolicy"; + } +} diff --git a/src/main/java/org/xbib/event/wal/BlockingQueuePool.java b/src/main/java/org/xbib/event/wal/BlockingQueuePool.java new file mode 100644 index 0000000..98d52c0 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/BlockingQueuePool.java @@ -0,0 +1,39 @@ +package org.xbib.event.wal; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public class BlockingQueuePool implements ObjectPool { + private final BlockingQueue queue; + private final Supplier creationFunction; + private final Predicate reuseCheck; + private final Consumer returnPreparation; + + public BlockingQueuePool(final int maxSize, final Supplier creationFunction, final Predicate reuseCheck, final Consumer returnPreparation) { + this.queue = new LinkedBlockingQueue<>(maxSize); + this.creationFunction = creationFunction; + this.reuseCheck = reuseCheck; + this.returnPreparation = returnPreparation; + } + + @Override + public T borrowObject() { + final T existing = queue.poll(); + if (existing != null) { + return existing; + } + + return creationFunction.get(); + } + + @Override + public void returnObject(final T somethingBorrowed) { + if (reuseCheck.test(somethingBorrowed)) { + returnPreparation.accept(somethingBorrowed); + queue.offer(somethingBorrowed); + } + } +} diff --git a/src/main/java/org/xbib/event/wal/ByteArrayDataOutputStream.java b/src/main/java/org/xbib/event/wal/ByteArrayDataOutputStream.java new file mode 100644 index 0000000..f8eaf18 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/ByteArrayDataOutputStream.java @@ -0,0 +1,28 @@ +package org.xbib.event.wal; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +/** + * A wrapper around a DataOutputStream, which wraps a ByteArrayOutputStream. + * This allows us to obtain the DataOutputStream itself so that we can perform + * writeXYZ methods and also allows us to obtain the underlying ByteArrayOutputStream + * for performing methods such as size(), reset(), writeTo() + */ +public class ByteArrayDataOutputStream { + private final ByteArrayOutputStream baos; + private final DataOutputStream dos; + + public ByteArrayDataOutputStream(final int initialBufferSize) { + this.baos = new ByteArrayOutputStream(initialBufferSize); + this.dos = new DataOutputStream(baos); + } + + public DataOutputStream getDataOutputStream() { + return dos; + } + + public ByteArrayOutputStream getByteArrayOutputStream() { + return baos; + } +} diff --git a/src/main/java/org/xbib/event/wal/ByteCountingInputStream.java b/src/main/java/org/xbib/event/wal/ByteCountingInputStream.java new file mode 100644 index 0000000..3b10fe0 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/ByteCountingInputStream.java @@ -0,0 +1,104 @@ +package org.xbib.event.wal; + +import java.io.IOException; +import java.io.InputStream; + +public class ByteCountingInputStream extends InputStream { + + private final InputStream in; + private long bytesRead = 0L; + private long bytesSkipped = 0L; + + private long bytesReadSinceMark = 0L; + private long bytesSkippedSinceMark = 0L; + + public ByteCountingInputStream(final InputStream in) { + this.in = in; + } + + public ByteCountingInputStream(final InputStream in, final long initialOffset) { + this.in = in; + this.bytesSkipped = initialOffset; + } + + @Override + public int read() throws IOException { + final int fromSuper = in.read(); + if (fromSuper >= 0) { + bytesRead++; + bytesReadSinceMark++; + } + return fromSuper; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + final int fromSuper = in.read(b, off, len); + if (fromSuper >= 0) { + bytesRead += fromSuper; + bytesReadSinceMark += fromSuper; + } + + return fromSuper; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(n); + if (skipped >= 0) { + bytesSkipped += skipped; + bytesSkippedSinceMark += skipped; + } + return skipped; + } + + public long getBytesRead() { + return bytesRead; + } + + public long getBytesSkipped() { + return bytesSkipped; + } + + public long getBytesConsumed() { + return getBytesRead() + getBytesSkipped(); + } + + @Override + public void mark(final int readlimit) { + in.mark(readlimit); + + bytesReadSinceMark = 0L; + bytesSkippedSinceMark = 0L; + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + bytesRead -= bytesReadSinceMark; + bytesSkipped -= bytesSkippedSinceMark; + + bytesReadSinceMark = 0L; + bytesSkippedSinceMark = 0L; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public int available() throws IOException { + return in.available(); + } +} diff --git a/src/main/java/org/xbib/event/wal/HashMapSnapshot.java b/src/main/java/org/xbib/event/wal/HashMapSnapshot.java new file mode 100644 index 0000000..9f4b80e --- /dev/null +++ b/src/main/java/org/xbib/event/wal/HashMapSnapshot.java @@ -0,0 +1,357 @@ +package org.xbib.event.wal; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class HashMapSnapshot implements WriteAheadSnapshot, RecordLookup { + private static final Logger logger = Logger.getLogger(HashMapSnapshot.class.getName()); + private static final int ENCODING_VERSION = 1; + + private final ConcurrentMap recordMap = new ConcurrentHashMap<>(); + private final SerDeFactory serdeFactory; + private final Set swapLocations = Collections.synchronizedSet(new HashSet<>()); + private final File storageDirectory; + + public HashMapSnapshot(final File storageDirectory, final SerDeFactory serdeFactory) { + this.serdeFactory = serdeFactory; + this.storageDirectory = storageDirectory; + } + + private SnapshotHeader validateHeader(final DataInputStream dataIn) throws IOException { + final String snapshotClass = dataIn.readUTF(); + logger.log(Level.FINE, () -> MessageFormat.format("Snapshot Class Name for {0} is {1}", + storageDirectory, snapshotClass)); + if (!snapshotClass.equals(HashMapSnapshot.class.getName())) { + throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using the " + + snapshotClass + " class; cannot restore using " + getClass().getName()); + } + + final int snapshotVersion = dataIn.readInt(); + logger.log(Level.FINE, () -> MessageFormat.format("Snapshot version for {0} is {1}", + storageDirectory, snapshotVersion)); + if (snapshotVersion > getVersion()) { + throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using version " + + snapshotVersion + " of the " + snapshotClass + " class; cannot restore using Version " + getVersion()); + } + + final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now + logger.log(Level.FINE, () -> MessageFormat.format("Serde encoding for Snapshot at {0} is {1}", + storageDirectory, serdeEncoding)); + + final int serdeVersion = dataIn.readInt(); + logger.log(Level.FINE, () -> MessageFormat.format("Serde version for Snapshot at {0} is {1}", + storageDirectory, serdeVersion)); + + final long maxTransactionId = dataIn.readLong(); + logger.log(Level.FINE, () -> MessageFormat.format("Max Transaction ID for Snapshot at {0} is {1}", + storageDirectory, maxTransactionId)); + + final int numRecords = dataIn.readInt(); + logger.log(Level.FINE, () -> MessageFormat.format("Number of Records for Snapshot at {0} is {1}", + storageDirectory, numRecords)); + + final SerDe serde = serdeFactory.createSerDe(serdeEncoding); + serde.readHeader(dataIn); + + return new SnapshotHeader(serde, serdeVersion, maxTransactionId, numRecords); + } + + @Override + public SnapshotRecovery recover() throws IOException { + final File partialFile = getPartialFile(); + final File snapshotFile = getSnapshotFile(); + final boolean partialExists = partialFile.exists(); + final boolean snapshotExists = snapshotFile.exists(); + + // If there is no snapshot (which is the case before the first snapshot is ever created), then just + // return an empty recovery. + if (!partialExists && !snapshotExists) { + return SnapshotRecovery.emptyRecovery(); + } + + if (partialExists && snapshotExists) { + // both files exist -- assume NiFi crashed/died while checkpointing. Delete the partial file. + Files.delete(partialFile.toPath()); + } else if (partialExists) { + // partial exists but snapshot does not -- we must have completed + // creating the partial, deleted the snapshot + // but crashed before renaming the partial to the snapshot. Just + // rename partial to snapshot + Files.move(partialFile.toPath(), snapshotFile.toPath()); + } + + if (snapshotFile.length() == 0) { + logger.log(Level.WARNING, () -> MessageFormat.format("{0} Found 0-byte Snapshot file; skipping Snapshot file in recovery", + this)); + return SnapshotRecovery.emptyRecovery(); + } + + // At this point, we know the snapshotPath exists because if it didn't, then we either returned null + // or we renamed partialPath to snapshotPath. So just Recover from snapshotPath. + try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(new FileInputStream(snapshotFile)))) { + // Ensure that the header contains the information that we expect and retrieve the relevant information from the header. + final SnapshotHeader header = validateHeader(dataIn); + + final SerDe serde = header.getSerDe(); + final int serdeVersion = header.getSerDeVersion(); + final int numRecords = header.getNumRecords(); + final long maxTransactionId = header.getMaxTransactionId(); + + // Read all of the records that we expect to receive. + for (int i = 0; i < numRecords; i++) { + final T record = serde.deserializeRecord(dataIn, serdeVersion); + if (record == null) { + throw new EOFException(); + } + + final UpdateType updateType = serde.getUpdateType(record); + if (updateType == UpdateType.DELETE) { + logger.log(Level.WARNING, () -> "While recovering from snapshot, found record with type 'DELETE'; this record will not be restored"); + continue; + } + + logger.log(Level.FINEST, () -> MessageFormat.format("Recovered from snapshot: {0}", record)); + recordMap.put(serde.getRecordIdentifier(record), record); + } + + // Determine the location of any swap files. + final int numSwapRecords = dataIn.readInt(); + final Set swapLocations = new HashSet<>(); + for (int i = 0; i < numSwapRecords; i++) { + swapLocations.add(dataIn.readUTF()); + } + this.swapLocations.addAll(swapLocations); + + logger.log(Level.INFO, () -> MessageFormat.format("{0} restored {1} Records and {2} Swap Files from Snapshot, ending with Transaction ID {3}", + this, numRecords, swapLocations.size(), maxTransactionId)); + + return new StandardSnapshotRecovery<>(recordMap, swapLocations, snapshotFile, maxTransactionId); + } + } + + @Override + public void update(final Collection records) { + // This implementation of Snapshot keeps a ConcurrentHashMap of all 'active' records + // (meaning records that have not been removed and are not swapped out), keyed by the + // Record Identifier. It keeps only the most up-to-date version of the Record. This allows + // us to write the snapshot very quickly without having to re-process the journal files. + // For each update, then, we will update the record in the map. + for (final T record : records) { + final Object recordId = serdeFactory.getRecordIdentifier(record); + final UpdateType updateType = serdeFactory.getUpdateType(record); + + switch (updateType) { + case DELETE: + recordMap.remove(recordId); + break; + case SWAP_OUT: + final String location = serdeFactory.getLocation(record); + if (location == null) { + logger.log(Level.INFO, () -> "Received Record (ID=" + recordId + ") with UpdateType of SWAP_OUT but " + + "no indicator of where the Record is to be Swapped Out to; these records may be " + + "lost when the repository is restored!"); + } else { + recordMap.remove(recordId); + this.swapLocations.add(location); + } + break; + case SWAP_IN: + final String swapLocation = serdeFactory.getLocation(record); + if (swapLocation == null) { + logger.log(Level.SEVERE, () -> "Received Record (ID=" + recordId + ") with UpdateType of SWAP_IN but no " + + "indicator of where the Record is to be Swapped In from; these records may be duplicated " + + "when the repository is restored!"); + } else { + swapLocations.remove(swapLocation); + } + recordMap.put(recordId, record); + break; + default: + recordMap.put(recordId, record); + break; + } + } + } + + @Override + public int getRecordCount() { + return recordMap.size(); + } + + @Override + public T lookup(final Object recordId) { + return recordMap.get(recordId); + } + + @Override + public SnapshotCapture prepareSnapshot(final long maxTransactionId) { + return prepareSnapshot(maxTransactionId, this.swapLocations); + } + + @Override + public SnapshotCapture prepareSnapshot(final long maxTransactionId, final Set swapFileLocations) { + return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapFileLocations), maxTransactionId); + } + + private int getVersion() { + return ENCODING_VERSION; + } + + private File getPartialFile() { + return new File(storageDirectory, "checkpoint.partial"); + } + + private File getSnapshotFile() { + return new File(storageDirectory, "checkpoint"); + } + + @Override + public synchronized void writeSnapshot(final SnapshotCapture snapshot) throws IOException { + final SerDe serde = serdeFactory.createSerDe(null); + + final File snapshotFile = getSnapshotFile(); + final File partialFile = getPartialFile(); + + // We must ensure that we do not overwrite the existing Snapshot file directly because if NiFi were + // to be killed or crash when we are partially finished, we'd end up with no viable Snapshot file at all. + // To avoid this, we write to a 'partial' file, then delete the existing Snapshot file, if it exists, and + // rename 'partial' to Snaphsot. That way, if NiFi crashes, we can still restore the Snapshot by first looking + // for a Snapshot file and restoring it, if it exists. If it does not exist, then we restore from the partial file, + // assuming that NiFi crashed after deleting the Snapshot file and before renaming the partial file. + // + // If there is no Snapshot file currently but there is a Partial File, then this indicates + // that we have deleted the Snapshot file and failed to rename the Partial File. We don't want + // to overwrite the Partial file, because doing so could potentially lose data. Instead, we must + // first rename it to Snapshot and then write to the partial file. + if (!snapshotFile.exists() && partialFile.exists()) { + final boolean rename = partialFile.renameTo(snapshotFile); + if (!rename) { + throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile); + } + } + + // Write to the partial file. + try (final FileOutputStream fileOut = new FileOutputStream(getPartialFile()); + final OutputStream bufferedOut = new BufferedOutputStream(fileOut); + final DataOutputStream dataOut = new DataOutputStream(bufferedOut)) { + + // Write out the header + dataOut.writeUTF(HashMapSnapshot.class.getName()); + dataOut.writeInt(getVersion()); + dataOut.writeUTF(serde.getClass().getName()); + dataOut.writeInt(serde.getVersion()); + dataOut.writeLong(snapshot.getMaxTransactionId()); + dataOut.writeInt(snapshot.getRecords().size()); + serde.writeHeader(dataOut); + + // Serialize each record + for (final T record : snapshot.getRecords().values()) { + logger.log(Level.FINEST, () -> MessageFormat.format("Checkpointing {0}", record)); + serde.serializeRecord(record, dataOut); + } + + // Write out the number of swap locations, followed by the swap locations themselves. + dataOut.writeInt(snapshot.getSwapLocations().size()); + for (final String swapLocation : snapshot.getSwapLocations()) { + dataOut.writeUTF(swapLocation); + } + + // Ensure that we flush the Buffered Output Stream and then perform an fsync(). + // This ensures that the data is fully written to disk before we delete the existing snapshot. + dataOut.flush(); + fileOut.getChannel().force(false); + } + + // If the snapshot file exists, delete it + if (snapshotFile.exists()) { + if (!snapshotFile.delete()) { + logger.log(Level.WARNING, () -> "Unable to delete existing Snapshot file " + snapshotFile); + } + } + + // Rename the partial file to Snapshot. + final boolean rename = partialFile.renameTo(snapshotFile); + if (!rename) { + throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile); + } + } + + + public class Snapshot implements SnapshotCapture { + private final Map records; + private final long maxTransactionId; + private final Set swapLocations; + + public Snapshot(final Map records, final Set swapLocations, final long maxTransactionId) { + this.records = records; + this.swapLocations = swapLocations; + this.maxTransactionId = maxTransactionId; + } + + @Override + public final Map getRecords() { + return records; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public Set getSwapLocations() { + return swapLocations; + } + } + + private class SnapshotHeader { + private final SerDe serde; + private final int serdeVersion; + private final int numRecords; + private final long maxTransactionId; + + public SnapshotHeader(final SerDe serde, final int serdeVersion, final long maxTransactionId, final int numRecords) { + this.serde = serde; + this.serdeVersion = serdeVersion; + this.maxTransactionId = maxTransactionId; + this.numRecords = numRecords; + } + + public SerDe getSerDe() { + return serde; + } + + public int getSerDeVersion() { + return serdeVersion; + } + + public long getMaxTransactionId() { + return maxTransactionId; + } + + public int getNumRecords() { + return numRecords; + } + } + +} diff --git a/src/main/java/org/xbib/event/wal/JournalRecovery.java b/src/main/java/org/xbib/event/wal/JournalRecovery.java new file mode 100644 index 0000000..7141fa7 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/JournalRecovery.java @@ -0,0 +1,10 @@ +package org.xbib.event.wal; + +public interface JournalRecovery { + + int getUpdateCount(); + + long getMaxTransactionId(); + + boolean isEOFExceptionEncountered(); +} diff --git a/src/main/java/org/xbib/event/wal/JournalSummary.java b/src/main/java/org/xbib/event/wal/JournalSummary.java new file mode 100644 index 0000000..64c1cbf --- /dev/null +++ b/src/main/java/org/xbib/event/wal/JournalSummary.java @@ -0,0 +1,18 @@ +package org.xbib.event.wal; + +public interface JournalSummary { + /** + * @return the Transaction ID of the first transaction written + */ + long getFirstTransactionId(); + + /** + * @return the Transaction ID of the last transaction written + */ + long getLastTransactionId(); + + /** + * @return the number of transactions that were written to the journal + */ + int getTransactionCount(); +} diff --git a/src/main/java/org/xbib/event/wal/LengthDelimitedJournal.java b/src/main/java/org/xbib/event/wal/LengthDelimitedJournal.java new file mode 100644 index 0000000..d89c9d9 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/LengthDelimitedJournal.java @@ -0,0 +1,607 @@ +package org.xbib.event.wal; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.DecimalFormat; +import java.text.MessageFormat; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class LengthDelimitedJournal implements WriteAheadJournal { + private static final Logger logger = Logger.getLogger(LengthDelimitedJournal.class.getName()); + private static final int DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES = 5 * 1024 * 1024; // 5 MB + + private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0); + private static final int JOURNAL_ENCODING_VERSION = 1; + private static final byte TRANSACTION_FOLLOWS = 64; + private static final byte JOURNAL_COMPLETE = 127; + private static final int NUL_BYTE = 0; + + private final File journalFile; + private final File overflowDirectory; + private final long initialTransactionId; + private final SerDeFactory serdeFactory; + private final ObjectPool streamPool; + private final int maxInHeapSerializationBytes; + + private SerDe serde; + private FileOutputStream fileOut; + private BufferedOutputStream bufferedOut; + + private long currentTransactionId; + private int transactionCount; + private boolean headerWritten = false; + + private volatile Throwable poisonCause = null; + private volatile boolean closed = false; + private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // guarded by synchronized block + + public LengthDelimitedJournal(final File journalFile, final SerDeFactory serdeFactory, final ObjectPool streamPool, final long initialTransactionId) { + this(journalFile, serdeFactory, streamPool, initialTransactionId, DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES); + } + + public LengthDelimitedJournal(final File journalFile, final SerDeFactory serdeFactory, final ObjectPool streamPool, final long initialTransactionId, + final int maxInHeapSerializationBytes) { + this.journalFile = journalFile; + this.overflowDirectory = new File(journalFile.getParentFile(), "overflow-" + getBaseFilename(journalFile)); + this.serdeFactory = serdeFactory; + this.serde = serdeFactory.createSerDe(null); + this.streamPool = streamPool; + + this.initialTransactionId = initialTransactionId; + this.currentTransactionId = initialTransactionId; + this.maxInHeapSerializationBytes = maxInHeapSerializationBytes; + } + + public void dispose() { + logger.log(Level.FINE, () -> MessageFormat.format("Deleting Journal {0} because it is now encapsulated in the latest Snapshot", + journalFile.getName())); + if (!journalFile.delete() && journalFile.exists()) { + logger.log(Level.WARNING, () -> "Unable to delete expired journal file " + journalFile + "; this file should be deleted manually."); + } + + if (overflowDirectory.exists()) { + final File[] overflowFiles = overflowDirectory.listFiles(); + if (overflowFiles == null) { + logger.log(Level.WARNING, () -> "Unable to obtain listing of files that exist in 'overflow directory' " + overflowDirectory + + " - this directory and any files within it can now be safely removed manually"); + return; + } + + for (final File overflowFile : overflowFiles) { + if (!overflowFile.delete() && overflowFile.exists()) { + logger.log(Level.WARNING, () -> "After expiring journal file " + journalFile + ", unable to remove 'overflow file' " + overflowFile + " - this file should be removed manually"); + } + } + + if (!overflowDirectory.delete()) { + logger.log(Level.WARNING, () -> "After expiring journal file " + journalFile + ", unable to remove 'overflow directory' " + overflowDirectory + " - this file should be removed manually"); + } + } + } + + private static String getBaseFilename(final File file) { + final String name = file.getName(); + final int index = name.lastIndexOf("."); + if (index < 0) { + return name; + } + + return name.substring(0, index); + } + + private synchronized OutputStream getOutputStream() throws FileNotFoundException { + if (fileOut == null) { + fileOut = new FileOutputStream(journalFile); + bufferedOut = new BufferedOutputStream(fileOut); + } + + return bufferedOut; + } + + @Override + public synchronized boolean isHealthy() { + return !closed && !isPoisoned(); + } + + private boolean isPoisoned() { + return poisonCause != null; + } + + @Override + public synchronized void writeHeader() throws IOException { + try { + final DataOutputStream outStream = new DataOutputStream(getOutputStream()); + outStream.writeUTF(LengthDelimitedJournal.class.getName()); + outStream.writeInt(JOURNAL_ENCODING_VERSION); + + serde = serdeFactory.createSerDe(null); + outStream.writeUTF(serde.getClass().getName()); + outStream.writeInt(serde.getVersion()); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream dos = new DataOutputStream(baos)) { + + serde.writeHeader(dos); + dos.flush(); + + final int serdeHeaderLength = baos.size(); + outStream.writeInt(serdeHeaderLength); + baos.writeTo(outStream); + } + + outStream.flush(); + } catch (final Throwable t) { + poison(t); + + final IOException ioe = (t instanceof IOException) ? (IOException) t : new IOException("Failed to create journal file " + journalFile, t); + logger.log(Level.SEVERE, MessageFormat.format("Failed to create new journal file {0} due to {1}", + journalFile, ioe.toString()), ioe); + throw ioe; + } + + headerWritten = true; + } + + private synchronized SerDeAndVersion validateHeader(final DataInputStream in) throws IOException { + final String journalClassName = in.readUTF(); + logger.log(Level.FINE, () -> MessageFormat.format("Write Ahead Log Class Name for {0} is {1}", + journalFile, journalClassName)); + if (!LengthDelimitedJournal.class.getName().equals(journalClassName)) { + throw new IOException("Invalid header information - " + journalFile + " does not appear to be a valid journal file."); + } + + final int encodingVersion = in.readInt(); + logger.log(Level.FINE, () -> MessageFormat.format("Encoding version for {0} is {1}", + journalFile, encodingVersion)); + if (encodingVersion > JOURNAL_ENCODING_VERSION) { + throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion + + " but this version of the code only understands version " + JOURNAL_ENCODING_VERSION + " and below"); + } + + final String serdeClassName = in.readUTF(); + logger.log(Level.FINE, () -> MessageFormat.format("Serde Class Name for {0} is {1}", + journalFile, serdeClassName)); + final SerDe serde; + try { + serde = serdeFactory.createSerDe(serdeClassName); + } catch (final IllegalArgumentException iae) { + throw new IOException("Cannot read journal file " + journalFile + " because the serializer/deserializer used was " + serdeClassName + + " but this repository is configured to use a different type of serializer/deserializer"); + } + + final int serdeVersion = in.readInt(); + logger.log(Level.FINE, () -> "Serde version is " + serdeVersion); + if (serdeVersion > serde.getVersion()) { + throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion + + " of the serializer/deserializer but this version of the code only understands version " + serde.getVersion() + " and below"); + } + + final int serdeHeaderLength = in.readInt(); + final InputStream serdeHeaderIn = new LimitingInputStream(in, serdeHeaderLength); + final DataInputStream dis = new DataInputStream(serdeHeaderIn); + serde.readHeader(dis); + + return new SerDeAndVersion(serde, serdeVersion); + } + + + // Visible/overrideable for testing. + protected void createOverflowDirectory(final Path path) throws IOException { + Files.createDirectories(path); + } + + @Override + public void update(final Collection records, final RecordLookup recordLookup) throws IOException { + if (!headerWritten) { + throw new IllegalStateException("Cannot update journal file " + journalFile + " because no header has been written yet."); + } + + if (records.isEmpty()) { + return; + } + + checkState(); + + File overflowFile = null; + final ByteArrayDataOutputStream bados = streamPool.borrowObject(); + + try { + FileOutputStream overflowFileOut = null; + + try { + DataOutputStream dataOut = bados.getDataOutputStream(); + for (final T record : records) { + final Object recordId = serde.getRecordIdentifier(record); + final T previousRecordState = recordLookup.lookup(recordId); + serde.serializeEdit(previousRecordState, record, dataOut); + + final int size = bados.getByteArrayOutputStream().size(); + if (serde.isWriteExternalFileReferenceSupported() && size > maxInHeapSerializationBytes) { + if (!overflowDirectory.exists()) { + createOverflowDirectory(overflowDirectory.toPath()); + } + + // If we have exceeded our threshold for how much to serialize in memory, + // flush the in-memory representation to an 'overflow file' and then update + // the Data Output Stream that is used to write to the file also. + overflowFile = new File(overflowDirectory, UUID.randomUUID().toString()); + final String overflowFileName = overflowFile.getName(); + logger.log(Level.FINE, () -> MessageFormat.format("Length of update with {0} records exceeds in-memory max of {1} bytes. Overflowing to {2}", + records.size(), maxInHeapSerializationBytes, overflowFileName)); + + overflowFileOut = new FileOutputStream(overflowFile); + bados.getByteArrayOutputStream().writeTo(overflowFileOut); + bados.getByteArrayOutputStream().reset(); + + // change dataOut to point to the File's Output Stream so that all subsequent records are written to the file. + dataOut = new DataOutputStream(new BufferedOutputStream(overflowFileOut)); + + // We now need to write to the ByteArrayOutputStream a pointer to the overflow file + // so that what is written to the actual journal is that pointer. + serde.writeExternalFileReference(overflowFile, bados.getDataOutputStream()); + } + } + + dataOut.flush(); + + // If we overflowed to an external file, we need to be sure that we sync to disk before + // updating the Journal. Otherwise, we could get to a state where the Journal was flushed to disk without the + // external file being flushed. This would result in a missed update to the FlowFile Repository. + if (overflowFileOut != null) { + if (logger.isLoggable(Level.FINE)) { + long l = overflowFile.length(); + logger.log(Level.FINE, () -> MessageFormat.format("Length of update to overflow file is {0} bytes", l)); + } + + overflowFileOut.getFD().sync(); + } + } finally { + if (overflowFileOut != null) { + try { + overflowFileOut.close(); + } catch (final Exception e) { + logger.log(Level.WARNING, "Failed to close open file handle to overflow file " + overflowFile.getName(), e); + } + } + } + + final ByteArrayOutputStream baos = bados.getByteArrayOutputStream(); + final OutputStream out = getOutputStream(); + + final long transactionId; + synchronized (this) { + checkState(); + + try { + transactionId = currentTransactionId++; + transactionCount++; + + transactionPreamble.clear(); + transactionPreamble.putLong(transactionId); + transactionPreamble.putInt(baos.size()); + + out.write(TRANSACTION_FOLLOWS); + out.write(transactionPreamble.array()); + baos.writeTo(out); + out.flush(); + } catch (final Throwable t) { + // While the outter Throwable that wraps this "catch" will call Poison, it is imperative that we call poison() + // before the synchronized block is excited. Otherwise, another thread could potentially corrupt the journal before + // the poison method closes the file. + poison(t); + throw t; + } + } + + logger.log(Level.FINE, () -> MessageFormat.format("Wrote Transaction {0} to journal {1} with length {2} and {3} records", + transactionId, journalFile, baos.size(), records.size())); + } catch (final Throwable t) { + poison(t); + + if (overflowFile != null) { + if (!overflowFile.delete() && overflowFile.exists()) { + final String overflowFileName = overflowFile.getName(); + logger.log(Level.WARNING, () -> "Failed to cleanup temporary overflow file " + overflowFileName + " - this file should be cleaned up manually."); + } + } + + throw t; + } finally { + streamPool.returnObject(bados); + } + } + + + private void checkState() throws IOException { + final Throwable cause = this.poisonCause; + if (cause != null) { + logger.log(Level.FINE, "Cannot update Write Ahead Log because the log has already been poisoned", cause); + throw new IOException("Cannot update journal file " + journalFile + " because this journal has already encountered a failure when attempting to write to the file. " + + "If the repository is able to checkpoint, then this problem will resolve itself. However, if the repository is unable to be checkpointed " + + "(for example, due to being out of storage space or having too many open files), then this issue may require manual intervention.", cause); + } + + if (closed) { + throw new IOException("Cannot update journal file " + journalFile + " because this journal has already been closed"); + } + } + + protected void poison(final Throwable t) { + this.poisonCause = t; + + logger.log(Level.SEVERE, MessageFormat.format("Marking Write-Ahead journal file {0} as poisoned due to {1}", + journalFile, t), t); + + try { + if (fileOut != null) { + fileOut.close(); + } + + closed = true; + } catch (final IOException innerIOE) { + t.addSuppressed(innerIOE); + } + } + + @Override + public synchronized void fsync() throws IOException { + checkState(); + + try { + if (fileOut != null) { + fileOut.getChannel().force(false); + } + } catch (final IOException ioe) { + poison(ioe); + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + closed = true; + + try { + if (fileOut != null) { + if (!isPoisoned()) { + fileOut.write(JOURNAL_COMPLETE); + } + + fileOut.close(); + } + } catch (final IOException ioe) { + poison(ioe); + } + } + + @Override + public JournalRecovery recoverRecords(final Map recordMap, final Set swapLocations) throws IOException { + long maxTransactionId = -1L; + int updateCount = 0; + + boolean eofException = false; + logger.log(Level.INFO, () -> "Recovering records from journal " + journalFile); + final double journalLength = journalFile.length(); + + try (final InputStream fis = new FileInputStream(journalFile); + final InputStream bufferedIn = new BufferedInputStream(fis); + final ByteCountingInputStream byteCountingIn = new ByteCountingInputStream(bufferedIn); + final DataInputStream in = new DataInputStream(byteCountingIn)) { + + try { + // Validate that the header is what we expect and obtain the appropriate SerDe and Version information + final SerDeAndVersion serdeAndVersion = validateHeader(in); + final SerDe serde = serdeAndVersion.getSerDe(); + + // Ensure that we get a valid transaction indicator + int transactionIndicator = in.read(); + if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) { + throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of " + + transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted."); + } + + long consumedAtLog = 0L; + + // We don't want to apply the updates in a transaction until we've finished recovering the entire + // transaction. Otherwise, we could apply say 8 out of 10 updates and then hit an EOF. In such a case, + // we want to rollback the entire transaction. We handle this by not updating recordMap or swapLocations + // variables directly but instead keeping track of the things that occurred and then once we've read the + // entire transaction, we can apply those updates to the recordMap and swapLocations. + final Map transactionRecordMap = new HashMap<>(); + final Set idsRemoved = new HashSet<>(); + final Set swapLocationsRemoved = new HashSet<>(); + final Set swapLocationsAdded = new HashSet<>(); + int transactionUpdates = 0; + + // While we have a transaction to recover, recover it + while (transactionIndicator == TRANSACTION_FOLLOWS) { + transactionRecordMap.clear(); + idsRemoved.clear(); + swapLocationsRemoved.clear(); + swapLocationsAdded.clear(); + transactionUpdates = 0; + + // Format is + final long transactionId = in.readLong(); + maxTransactionId = Math.max(maxTransactionId, transactionId); + final int transactionLength = in.readInt(); + + // Use SerDe to deserialize the update. We use a LimitingInputStream to ensure that the SerDe is not able to read past its intended + // length, in case there is a bug in the SerDe. We then use a ByteCountingInputStream so that we can ensure that all of the data has + // been read and throw EOFException otherwise. + final InputStream transactionLimitingIn = new LimitingInputStream(in, transactionLength); + final ByteCountingInputStream transactionByteCountingIn = new ByteCountingInputStream(transactionLimitingIn); + final DataInputStream transactionDis = new DataInputStream(transactionByteCountingIn); + + while (transactionByteCountingIn.getBytesConsumed() < transactionLength || serde.isMoreInExternalFile()) { + final T record = serde.deserializeEdit(transactionDis, recordMap, serdeAndVersion.getVersion()); + + // Update our RecordMap so that we have the most up-to-date version of the Record. + final Object recordId = serde.getRecordIdentifier(record); + final UpdateType updateType = serde.getUpdateType(record); + + switch (updateType) { + case DELETE: { + idsRemoved.add(recordId); + transactionRecordMap.remove(recordId); + break; + } + case SWAP_IN: { + final String location = serde.getLocation(record); + if (location == null) { + logger.log(Level.SEVERE, "Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocationsRemoved.add(location); + swapLocationsAdded.remove(location); + transactionRecordMap.put(recordId, record); + } + break; + } + case SWAP_OUT: { + final String location = serde.getLocation(record); + if (location == null) { + logger.log(Level.SEVERE, "Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocationsRemoved.remove(location); + swapLocationsAdded.add(location); + idsRemoved.add(recordId); + transactionRecordMap.remove(recordId); + } + + break; + } + default: { + transactionRecordMap.put(recordId, record); + idsRemoved.remove(recordId); + break; + } + } + + transactionUpdates++; + } + + // Apply the transaction + for (final Object id : idsRemoved) { + recordMap.remove(id); + } + recordMap.putAll(transactionRecordMap); + swapLocations.removeAll(swapLocationsRemoved); + swapLocations.addAll(swapLocationsAdded); + updateCount += transactionUpdates; + + // Check if there is another transaction to read + transactionIndicator = in.read(); + if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) { + throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of " + + transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted."); + } + + // If we have a very large journal (for instance, if checkpoint is not called for a long time, or if there is a problem rolling over + // the journal), then we want to occasionally notify the user that we are, in fact, making progress, so that it doesn't appear that + // NiFi has become "stuck". + final long consumed = byteCountingIn.getBytesConsumed(); + if (consumed - consumedAtLog > 50_000_000) { + final double percentage = consumed / journalLength * 100D; + final String pct = new DecimalFormat("#.00").format(percentage); + final int count = updateCount; + logger.log(Level.INFO, () -> MessageFormat.format("{0}% of the way finished recovering journal {1}, having recovered {2} updates", + pct, journalFile, count)); + consumedAtLog = consumed; + } + } + } catch (final EOFException eof) { + eofException = true; + logger.log(Level.WARNING, () -> MessageFormat.format("Encountered unexpected End-of-File when reading journal file {0}; assuming that NiFi was shutdown unexpectedly and continuing recovery", + journalFile)); + } catch (final Exception e) { + // If the stream consists solely of NUL bytes, then we want to treat it + // the same as an EOF because we see this happen when we suddenly lose power + // while writing to a file. However, if that is not the case, then something else has gone wrong. + // In such a case, there is not much that we can do but to re-throw the Exception. + if (remainingBytesAllNul(in)) { + logger.log(Level.WARNING, "Failed to recover some of the data from Write-Ahead Log Journal because encountered trailing NUL bytes. " + + "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes." + + "The following Exception was encountered while recovering the updates to the journal:", e); + } else { + throw e; + } + } + } + final int count = updateCount; + logger.log(Level.INFO, () -> MessageFormat.format("Successfully recovered {0} updates from journal {1}", count, journalFile)); + return new StandardJournalRecovery(updateCount, maxTransactionId, eofException); + } + + /** + * In the case of a sudden power loss, it is common - at least in a Linux journaling File System - + * that the partition file that is being written to will have many trailing "NUL bytes" (0's). + * If this happens, then on restart we want to treat this as an incomplete transaction, so we detect + * this case explicitly. + * + * @param in the input stream to scan + * @return true if the InputStream contains no data or contains only NUL bytes + * @throws IOException if unable to read from the given InputStream + */ + private boolean remainingBytesAllNul(final InputStream in) throws IOException { + int nextByte; + while ((nextByte = in.read()) != -1) { + if (nextByte != NUL_BYTE) { + return false; + } + } + + return true; + } + + + @Override + public synchronized JournalSummary getSummary() { + if (transactionCount < 1) { + return INACTIVE_JOURNAL_SUMMARY; + } + + return new StandardJournalSummary(initialTransactionId, currentTransactionId - 1, transactionCount); + } + + private class SerDeAndVersion { + private final SerDe serde; + private final int version; + + public SerDeAndVersion(final SerDe serde, final int version) { + this.serde = serde; + this.version = version; + } + + public SerDe getSerDe() { + return serde; + } + + public int getVersion() { + return version; + } + } +} diff --git a/src/main/java/org/xbib/event/wal/LimitingInputStream.java b/src/main/java/org/xbib/event/wal/LimitingInputStream.java new file mode 100644 index 0000000..7c1a1b5 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/LimitingInputStream.java @@ -0,0 +1,122 @@ +package org.xbib.event.wal; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + private volatile boolean limitReached = false; + private long markOffset = -1L; + + /** + * Constructs a limited input stream whereby if the limit is reached all + * subsequent calls to read will return a -1 and hasLimitReached() will + * indicate true. The limit is inclusive so if all 100 bytes of a 100 byte + * stream are read it will be true, otherwise false. + * + * @param in the underlying input stream + * @param limit maximum length of bytes to read from underlying input stream + */ + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + public boolean hasReachedLimit() throws IOException { + return limitReached; + } + + private int markLimitReached() { + limitReached = true; + return -1; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return markLimitReached(); + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return markLimitReached(); + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return markLimitReached(); + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long toSkip = Math.min(n, limit - bytesRead); + final long skipped = in.skip(toSkip); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + markOffset = bytesRead; + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + + if (markOffset >= 0) { + bytesRead = markOffset; + } + markOffset = -1; + } + + public long getLimit() { + return limit; + } +} diff --git a/src/main/java/org/xbib/event/wal/ObjectPool.java b/src/main/java/org/xbib/event/wal/ObjectPool.java new file mode 100644 index 0000000..6f5105b --- /dev/null +++ b/src/main/java/org/xbib/event/wal/ObjectPool.java @@ -0,0 +1,8 @@ +package org.xbib.event.wal; + +public interface ObjectPool { + + T borrowObject(); + + void returnObject(T somethingBorrowed); +} diff --git a/src/main/java/org/xbib/event/wal/RecordLookup.java b/src/main/java/org/xbib/event/wal/RecordLookup.java new file mode 100644 index 0000000..13848ec --- /dev/null +++ b/src/main/java/org/xbib/event/wal/RecordLookup.java @@ -0,0 +1,12 @@ +package org.xbib.event.wal; + +public interface RecordLookup { + + /** + * Returns the Record with the given identifier, or null if no such record exists + * + * @param identifier the identifier of the record to lookup + * @return the Record with the given identifier, or null if no such record exists + */ + T lookup(Object identifier); +} diff --git a/src/main/java/org/xbib/event/wal/SequentialAccessWriteAheadLog.java b/src/main/java/org/xbib/event/wal/SequentialAccessWriteAheadLog.java new file mode 100644 index 0000000..092b9a1 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/SequentialAccessWriteAheadLog.java @@ -0,0 +1,327 @@ +package org.xbib.event.wal; + +import java.io.File; +import java.io.IOException; +import java.text.MessageFormat; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +/** + *

+ * This implementation of WriteAheadRepository provides the ability to write all updates to the + * repository sequentially by writing to a single journal file. Serialization of data into bytes + * happens outside of any lock contention and is done so using recycled byte buffers. As such, + * we occur minimal garbage collection and the theoretical throughput of this repository is equal + * to the throughput of the underlying disk itself. + *

+ * + *

+ * This implementation makes the assumption that only a single thread will ever issue updates for + * a given Record at any one time. I.e., the implementation is thread-safe but cannot guarantee + * that records are recovered correctly if two threads simultaneously update the write-ahead log + * with updates for the same record. + *

+ */ +public class SequentialAccessWriteAheadLog implements WriteAheadRepository { + private static final int PARTITION_INDEX = 0; + private static final Logger logger = Logger.getLogger(SequentialAccessWriteAheadLog.class.getName()); + private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal"); + private static final int MAX_BUFFERS = 64; + private static final int BUFFER_SIZE = 256 * 1024; + + private final File storageDirectory; + private final File journalsDirectory; + protected final SerDeFactory serdeFactory; + private final SyncListener syncListener; + private final Set recoveredSwapLocations = new HashSet<>(); + + private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock(); + private final Lock journalReadLock = journalRWLock.readLock(); + private final Lock journalWriteLock = journalRWLock.writeLock(); + private final ObjectPool streamPool = new BlockingQueuePool<>(MAX_BUFFERS, + () -> new ByteArrayDataOutputStream(BUFFER_SIZE), + stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE, + stream -> stream.getByteArrayOutputStream().reset()); + + private final WriteAheadSnapshot snapshot; + private final RecordLookup recordLookup; + private SnapshotRecovery snapshotRecovery; + + private volatile boolean recovered = false; + private WriteAheadJournal journal; + private volatile long nextTransactionId = 0L; + + public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory serdeFactory) throws IOException { + this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER); + } + + public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory serdeFactory, final SyncListener syncListener) throws IOException { + if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { + throw new IOException("Directory " + storageDirectory + " does not exist and cannot be created"); + } + if (!storageDirectory.isDirectory()) { + throw new IOException("File " + storageDirectory + " is a regular file and not a directory"); + } + + final HashMapSnapshot hashMapSnapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + this.snapshot = hashMapSnapshot; + this.recordLookup = hashMapSnapshot; + + this.storageDirectory = storageDirectory; + this.journalsDirectory = new File(storageDirectory, "journals"); + if (!journalsDirectory.exists() && !journalsDirectory.mkdirs()) { + throw new IOException("Directory " + journalsDirectory + " does not exist and cannot be created"); + } + + recovered = false; + + this.serdeFactory = serdeFactory; + this.syncListener = (syncListener == null) ? SyncListener.NOP_SYNC_LISTENER : syncListener; + } + + @Override + public int update(final Collection records, final boolean forceSync) throws IOException { + if (!recovered) { + throw new IllegalStateException("Cannot update repository until record recovery has been performed"); + } + + journalReadLock.lock(); + try { + journal.update(records, recordLookup); + + if (forceSync) { + journal.fsync(); + syncListener.onSync(PARTITION_INDEX); + } + + snapshot.update(records); + } finally { + journalReadLock.unlock(); + } + + return PARTITION_INDEX; + } + + @Override + public synchronized Collection recoverRecords() throws IOException { + if (recovered) { + throw new IllegalStateException("Cannot recover records from repository because record recovery has already commenced"); + } + + logger.log(Level.INFO, () -> MessageFormat.format("Recovering records from Write-Ahead Log at {0}", + storageDirectory)); + + final long recoverStart = System.nanoTime(); + recovered = true; + snapshotRecovery = snapshot.recover(); + this.recoveredSwapLocations.addAll(snapshotRecovery.getRecoveredSwapLocations()); + + final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart); + + final Map recoveredRecords = snapshotRecovery.getRecords(); + final Set swapLocations = snapshotRecovery.getRecoveredSwapLocations(); + + final File[] journalFiles = journalsDirectory.listFiles(this::isJournalFile); + if (journalFiles == null) { + throw new IOException("Cannot access the list of files in directory " + journalsDirectory + + "; please ensure that appropriate file permissions are set."); + } + + if (snapshotRecovery.getRecoveryFile() == null) { + logger.log(Level.INFO, () -> MessageFormat.format("No Snapshot File to recover from at {0}. Now recovering records from {1} journal files", + storageDirectory, journalFiles.length)); + } else { + logger.log(Level.INFO, () -> MessageFormat.format("Successfully recovered {0} records and {1} swap files from Snapshot at {2} with Max Transaction ID of {3} in {4} milliseconds. Now recovering records from {5} journal files", + recoveredRecords.size(), swapLocations.size(), snapshotRecovery.getRecoveryFile(), snapshotRecovery.getMaxTransactionId(), + snapshotRecoveryMillis, journalFiles.length)); + } + + final List orderedJournalFiles = Arrays.asList(journalFiles); + orderedJournalFiles.sort((o1, o2) -> { + final long transactionId1 = getMinTransactionId(o1); + final long transactionId2 = getMinTransactionId(o2); + + return Long.compare(transactionId1, transactionId2); + }); + + final long snapshotTransactionId = snapshotRecovery.getMaxTransactionId(); + + int totalUpdates = 0; + int journalFilesRecovered = 0; + int journalFilesSkipped = 0; + long maxTransactionId = snapshotTransactionId; + + for (final File journalFile : orderedJournalFiles) { + final long journalMinTransactionId = getMinTransactionId(journalFile); + if (journalMinTransactionId < snapshotTransactionId) { + logger.log(Level.FINE, () -> MessageFormat.format("Will not recover records from journal file {0} because the minimum Transaction ID for that journal is {1} and the Transaction ID recovered from Snapshot was {2}", + journalFile, journalMinTransactionId, snapshotTransactionId)); + + journalFilesSkipped++; + continue; + } + + logger.log(Level.FINE, () -> MessageFormat.format("Min Transaction ID for journal {0} is {1}, so will recover records from journal", + journalFile, journalMinTransactionId)); + journalFilesRecovered++; + + try (final WriteAheadJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final JournalRecovery journalRecovery = journal.recoverRecords(recoveredRecords, swapLocations); + final int updates = journalRecovery.getUpdateCount(); + + logger.log(Level.FINE, () -> MessageFormat.format("Recovered {0} updates from journal {1}", + updates, journalFile)); + totalUpdates += updates; + maxTransactionId = Math.max(maxTransactionId, journalRecovery.getMaxTransactionId()); + } + } + + logger.log(Level.FINE, MessageFormat.format("Recovered {0} updates from {1} journal files and skipped {2} journal files because their data was already encapsulated in the snapshot", + totalUpdates, journalFilesRecovered, journalFilesSkipped)); + this.nextTransactionId = maxTransactionId + 1; + + final long recoverNanos = System.nanoTime() - recoverStart; + final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS); + logger.log(Level.INFO, () -> MessageFormat.format("Successfully recovered {0} records in {1} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", + recoveredRecords.size(), recoveryMillis)); + + this.recoveredSwapLocations.addAll(swapLocations); + + checkpoint(this.recoveredSwapLocations); + + return recoveredRecords.values(); + } + + private long getMinTransactionId(final File journalFile) { + final String filename = journalFile.getName(); + final String numeral = filename.substring(0, filename.indexOf(".")); + return Long.parseLong(numeral); + } + + private boolean isJournalFile(final File file) { + if (!file.isFile()) { + return false; + } + + final String filename = file.getName(); + return JOURNAL_FILENAME_PATTERN.matcher(filename).matches(); + } + + @Override + public synchronized Set getRecoveredSwapLocations() throws IOException { + if (!recovered) { + throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed"); + } + + return Collections.unmodifiableSet(this.recoveredSwapLocations); + } + + public SnapshotCapture captureSnapshot() { + return snapshot.prepareSnapshot(nextTransactionId - 1); + } + + @Override + public int checkpoint() throws IOException { + return checkpoint(null); + } + + private int checkpoint(final Set swapLocations) throws IOException { + final SnapshotCapture snapshotCapture; + + final long startNanos = System.nanoTime(); + final File[] existingJournals; + journalWriteLock.lock(); + try { + if (journal != null) { + final JournalSummary journalSummary = journal.getSummary(); + if (journalSummary.getTransactionCount() == 0 && journal.isHealthy()) { + logger.log(Level.FINE, "Will not checkpoint Write-Ahead Log because no updates have occurred since last checkpoint"); + return snapshot.getRecordCount(); + } + + try { + journal.fsync(); + } catch (final Exception e) { + logger.log(Level.SEVERE, "Failed to synch Write-Ahead Log's journal to disk at " + storageDirectory, e); + } + + try { + journal.close(); + } catch (final Exception e) { + logger.log(Level.SEVERE, "Failed to close Journal while attempting to checkpoint Write-Ahead Log at " + storageDirectory, e); + } + + nextTransactionId = Math.max(nextTransactionId, journalSummary.getLastTransactionId() + 1); + } + + syncListener.onGlobalSync(); + + final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile); + existingJournals = (existingFiles == null) ? new File[0] : existingFiles; + + if (swapLocations == null) { + snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1); + } else { + snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1, swapLocations); + } + + + // Create a new journal. We name the journal file .journal but it is possible + // that we could have an empty journal file already created. If this happens, we don't want to create + // a new file on top of it because it would get deleted below when we clean up old journals. So we + // will simply increment our transaction ID and try again. + File journalFile = new File(journalsDirectory, nextTransactionId + ".journal"); + while (journalFile.exists()) { + nextTransactionId++; + journalFile = new File(journalsDirectory, nextTransactionId + ".journal"); + } + + journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, nextTransactionId); + journal.writeHeader(); + + logger.log(Level.FINE, () -> MessageFormat.format("Created new Journal starting with Transaction ID {0}", nextTransactionId)); + } finally { + journalWriteLock.unlock(); + } + + final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + snapshot.writeSnapshot(snapshotCapture); + + for (final File existingJournal : existingJournals) { + final WriteAheadJournal journal = new LengthDelimitedJournal<>(existingJournal, serdeFactory, streamPool, nextTransactionId); + journal.dispose(); + } + + final long totalNanos = System.nanoTime() - startNanos; + final long millis = TimeUnit.NANOSECONDS.toMillis(totalNanos); + logger.log(Level.INFO, () -> MessageFormat.format("Checkpointed Write-Ahead Log with {0} Records and {1} Swap Files in {2} milliseconds (Stop-the-world time = {3} milliseconds), max Transaction ID {4}", + snapshotCapture.getRecords().size(), snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, snapshotCapture.getMaxTransactionId())); + + return snapshotCapture.getRecords().size(); + } + + + @Override + public void shutdown() throws IOException { + journalWriteLock.lock(); + try { + if (journal != null) { + journal.close(); + } + } finally { + journalWriteLock.unlock(); + } + } +} diff --git a/src/main/java/org/xbib/event/wal/SerDe.java b/src/main/java/org/xbib/event/wal/SerDe.java new file mode 100644 index 0000000..5bc6987 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/SerDe.java @@ -0,0 +1,172 @@ +package org.xbib.event.wal; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.Map; + +/** + * A mechanism for Serializing and De-Serializing a Record of a given Type + * + * @param the type of record that is to be Serialized and De-Serialized by + * this object + */ +public interface SerDe { + + /** + * Provides the SerDe a chance to write header information to the given output stream + * + * @param out the DataOutputStream to write to + * @throws IOException if unable to write to the OutputStream + */ + default void writeHeader(DataOutputStream out) throws IOException { + } + + /** + *

+ * Serializes an Edit Record to the log via the given + * {@link DataOutputStream}. + *

+ * + * @param previousRecordState previous state + * @param newRecordState new state + * @param out stream to write to + * @throws IOException if fail during write + */ + void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException; + + /** + *

+ * Serializes a Record in a form suitable for a Snapshot via the given + * {@link DataOutputStream}. + *

+ * + * @param record to serialize + * @param out to write to + * @throws IOException if failed to write + */ + void serializeRecord(T record, DataOutputStream out) throws IOException; + + /** + * Provides the SerDe the opportunity to read header information before deserializing any records + * + * @param in the InputStream to read from + * @throws IOException if unable to read from the InputStream + */ + default void readHeader(DataInputStream in) throws IOException { + } + + /** + *

+ * Reads an Edit Record from the given {@link DataInputStream} and merges + * that edit with the current version of the record, returning the new, + * merged version. If the Edit Record indicates that the entity was deleted, + * must return a Record with an UpdateType of {@link UpdateType#DELETE}. + * This method must never return null. + *

+ * + * @param in to deserialize from + * @param currentRecordStates an unmodifiable map of Record ID's to the + * current state of that record + * @param version the version of the SerDe that was used to serialize the + * edit record + * @return deserialized record + * @throws IOException if failure reading + */ + T deserializeEdit(DataInputStream in, Map currentRecordStates, int version) throws IOException; + + /** + *

+ * Reads a Record from the given {@link DataInputStream} and returns this + * record. If no data is available, returns null. + *

+ * + * @param in stream to read from + * @param version the version of the SerDe that was used to serialize the + * record + * @return record + * @throws IOException failure reading + */ + T deserializeRecord(DataInputStream in, int version) throws IOException; + + /** + * Returns the unique ID for the given record + * + * @param record to obtain identifier for + * @return identifier of record + */ + Object getRecordIdentifier(T record); + + /** + * Returns the UpdateType for the given record + * + * @param record to retrieve update type for + * @return update type + */ + UpdateType getUpdateType(T record); + + /** + * Returns the external location of the given record; this is used when a + * record is moved away from WALI or is being re-introduced to WALI. For + * example, WALI can be updated with a record of type + * {@link UpdateType#SWAP_OUT} that indicates a Location of + * file://tmp/external1 and can then be re-introduced to WALI by updating + * WALI with a record of type {@link UpdateType#CREATE} that indicates a + * Location of file://tmp/external1 + * + * @param record to get location of + * @return location + */ + String getLocation(T record); + + /** + * Returns the version that this SerDe will use when writing. This used used + * when serializing/deserializing the edit logs so that if the version + * changes, we are still able to deserialize old versions + * + * @return version + */ + int getVersion(); + + /** + * Closes any resources that the SerDe is holding open + * + * @throws IOException if unable to close resources + */ + default void close() throws IOException { + } + + /** + * Optional method that a SerDe can support that indicates that the contents of the next update should be found + * in the given external File. + * + * @param externalFile the file that contains the update information + * @param out the DataOutputStream to write the external file reference to + * @throws IOException if unable to write the update + * @throws UnsupportedOperationException if this SerDe does not support this operation + */ + default void writeExternalFileReference(File externalFile, DataOutputStream out) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Indicates whether or not a call to {@link #writeExternalFileReference(File, DataOutputStream)} is valid for this implementation + * @return true if calls to {@link #writeExternalFileReference(File, DataOutputStream)} are supported, false if calling + * the method will result in an {@link UnsupportedOperationException} being thrown. + */ + default boolean isWriteExternalFileReferenceSupported() { + return false; + } + + /** + * If the last call to read data from this SerDe resulted in data being read from an External File, and there is more data in that External File, + * then this method will return true. Otherwise, it will return false. + * + * @return true if more data available in External File, false otherwise. + * @throws IOException if unable to read from External File to determine data availability + */ + default boolean isMoreInExternalFile() throws IOException { + return false; + } +} diff --git a/src/main/java/org/xbib/event/wal/SerDeFactory.java b/src/main/java/org/xbib/event/wal/SerDeFactory.java new file mode 100644 index 0000000..ecf1fc9 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/SerDeFactory.java @@ -0,0 +1,43 @@ +package org.xbib.event.wal; + +public interface SerDeFactory { + + /** + * Returns a new SerDe + * + * @param encodingName the name of encoding that was used when writing the serialized data, or null if + * the SerDe is to be used for serialization purposes + * @return a SerDe + */ + SerDe createSerDe(String encodingName); + + /** + * Returns the unique ID for the given record + * + * @param record to obtain identifier for + * @return identifier of record + */ + Object getRecordIdentifier(T record); + + /** + * Returns the UpdateType for the given record + * + * @param record to retrieve update type for + * @return update type + */ + UpdateType getUpdateType(T record); + + /** + * Returns the external location of the given record; this is used when a + * record is moved away from WALI or is being re-introduced to WALI. For + * example, WALI can be updated with a record of type + * {@link UpdateType#SWAP_OUT} that indicates a Location of + * file://tmp/external1 and can then be re-introduced to WALI by updating + * WALI with a record of type {@link UpdateType#CREATE} that indicates a + * Location of file://tmp/external1 + * + * @param record to get location of + * @return location + */ + String getLocation(T record); +} diff --git a/src/main/java/org/xbib/event/wal/SnapshotCapture.java b/src/main/java/org/xbib/event/wal/SnapshotCapture.java new file mode 100644 index 0000000..f5aa323 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/SnapshotCapture.java @@ -0,0 +1,12 @@ +package org.xbib.event.wal; + +import java.util.Map; +import java.util.Set; + +public interface SnapshotCapture { + Map getRecords(); + + long getMaxTransactionId(); + + Set getSwapLocations(); +} diff --git a/src/main/java/org/xbib/event/wal/SnapshotRecovery.java b/src/main/java/org/xbib/event/wal/SnapshotRecovery.java new file mode 100644 index 0000000..a127da6 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/SnapshotRecovery.java @@ -0,0 +1,41 @@ +package org.xbib.event.wal; + +import java.io.File; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +public interface SnapshotRecovery { + long getMaxTransactionId(); + + Map getRecords(); + + Set getRecoveredSwapLocations(); + + File getRecoveryFile(); + + + public static SnapshotRecovery emptyRecovery() { + return new SnapshotRecovery() { + @Override + public long getMaxTransactionId() { + return -1L; + } + + @Override + public Map getRecords() { + return Collections.emptyMap(); + } + + @Override + public Set getRecoveredSwapLocations() { + return Collections.emptySet(); + } + + @Override + public File getRecoveryFile() { + return null; + } + }; + } +} diff --git a/src/main/java/org/xbib/event/wal/StandardJournalRecovery.java b/src/main/java/org/xbib/event/wal/StandardJournalRecovery.java new file mode 100644 index 0000000..436acfb --- /dev/null +++ b/src/main/java/org/xbib/event/wal/StandardJournalRecovery.java @@ -0,0 +1,28 @@ +package org.xbib.event.wal; + +public class StandardJournalRecovery implements JournalRecovery { + private final int updateCount; + private final long maxTransactionId; + private final boolean eofException; + + public StandardJournalRecovery(final int updateCount, final long maxTransactionId, final boolean eofException) { + this.updateCount = updateCount; + this.maxTransactionId = maxTransactionId; + this.eofException = eofException; + } + + @Override + public int getUpdateCount() { + return updateCount; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public boolean isEOFExceptionEncountered() { + return eofException; + } +} diff --git a/src/main/java/org/xbib/event/wal/StandardJournalSummary.java b/src/main/java/org/xbib/event/wal/StandardJournalSummary.java new file mode 100644 index 0000000..916aaf4 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/StandardJournalSummary.java @@ -0,0 +1,29 @@ +package org.xbib.event.wal; + +public class StandardJournalSummary implements JournalSummary { + private final long firstTransactionId; + private final long lastTransactionId; + private final int transactionCount; + + public StandardJournalSummary(final long firstTransactionId, final long lastTransactionId, final int transactionCount) { + this.firstTransactionId = firstTransactionId; + this.lastTransactionId = lastTransactionId; + this.transactionCount = transactionCount; + } + + @Override + public long getFirstTransactionId() { + return firstTransactionId; + } + + @Override + public long getLastTransactionId() { + return lastTransactionId; + } + + @Override + public int getTransactionCount() { + return transactionCount; + } + +} diff --git a/src/main/java/org/xbib/event/wal/StandardSnapshotRecovery.java b/src/main/java/org/xbib/event/wal/StandardSnapshotRecovery.java new file mode 100644 index 0000000..7690f38 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/StandardSnapshotRecovery.java @@ -0,0 +1,39 @@ +package org.xbib.event.wal; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +public class StandardSnapshotRecovery implements SnapshotRecovery { + private final Map recordMap; + private final Set recoveredSwapLocations; + private final File recoveryFile; + private final long maxTransactionId; + + public StandardSnapshotRecovery(final Map recordMap, final Set recoveredSwapLocations, final File recoveryFile, final long maxTransactionId) { + this.recordMap = recordMap; + this.recoveredSwapLocations = recoveredSwapLocations; + this.recoveryFile = recoveryFile; + this.maxTransactionId = maxTransactionId; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public Map getRecords() { + return recordMap; + } + + @Override + public Set getRecoveredSwapLocations() { + return recoveredSwapLocations; + } + + @Override + public File getRecoveryFile() { + return recoveryFile; + } +} diff --git a/src/main/java/org/xbib/event/wal/SyncListener.java b/src/main/java/org/xbib/event/wal/SyncListener.java new file mode 100644 index 0000000..6081b82 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/SyncListener.java @@ -0,0 +1,56 @@ +package org.xbib.event.wal; + +/** + *

+ * Provides a callback mechanism by which applicable listeners can be notified + * when a WriteAheadRepository is synched (via the + * {@link WriteAheadRepository#sync()} method) or one of its partitions is + * synched via + * {@link WriteAheadRepository#update(java.util.Collection, boolean)} with a + * value of true for the second argument. + *

+ * + *

+ * It is not required that an implementation of {@link WriteAheadRepository} + * support this interface. Those that do generally will require that the + * listener be injected via the constructor. + *

+ * + *

+ * All implementations of this interface must be thread-safe. + *

+ * + *

+ * The {@link #onSync(int)} method will always be called while the associated + * partition is locked. The {@link #onGlobalSync()} will always be called while + * the entire repository is locked. + *

+ * + */ +public interface SyncListener { + + /** + * This method is called whenever a specific partition is synched via the + * {@link WriteAheadRepository#update(java.util.Collection, boolean)} method + * + * @param partitionIndex the index of the partition that was synched + */ + void onSync(int partitionIndex); + + /** + * This method is called whenever the entire + * WriteAheadRepository is synched via the + * {@link WriteAheadRepository#sync()} method. + */ + void onGlobalSync(); + + public static final SyncListener NOP_SYNC_LISTENER = new SyncListener() { + @Override + public void onSync(int partitionIndex) { + } + + @Override + public void onGlobalSync() { + } + }; +} diff --git a/src/main/java/org/xbib/event/wal/UpdateType.java b/src/main/java/org/xbib/event/wal/UpdateType.java new file mode 100644 index 0000000..d94fa2e --- /dev/null +++ b/src/main/java/org/xbib/event/wal/UpdateType.java @@ -0,0 +1,33 @@ +package org.xbib.event.wal; + +/** + *

+ * Enumerates the valid types of things that can cause a + * {@link WriteAheadRepository} to update its state

+ */ +public enum UpdateType { + + /** + * Used when a new Record has been created + */ + CREATE, + /** + * Used when a Record has been updated in some way + */ + UPDATE, + /** + * Used to indicate that a Record has been deleted and should be removed + * from the Repository + */ + DELETE, + /** + * Used to indicate that a Record still exists but has been moved elsewhere, + * so that it is no longer maintained by the WALI instance + */ + SWAP_OUT, + /** + * Used to indicate that a Record that was previously Swapped Out is now + * being Swapped In + */ + SWAP_IN; +} diff --git a/src/main/java/org/xbib/event/wal/WriteAheadJournal.java b/src/main/java/org/xbib/event/wal/WriteAheadJournal.java new file mode 100644 index 0000000..d7842b4 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/WriteAheadJournal.java @@ -0,0 +1,44 @@ +package org.xbib.event.wal; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public interface WriteAheadJournal extends Closeable { + + JournalRecovery recoverRecords(Map recordMap, Set swapLocations) throws IOException; + + /** + * Updates the journal with the given set of records + * + * @param records the records to update + * @param recordLookup a lookup that can be used to access the current value of a record, given its ID + * + * @throws IOException if unable to write to the underlying storage mechanism + */ + void update(Collection records, RecordLookup recordLookup) throws IOException; + + void writeHeader() throws IOException; + + void fsync() throws IOException; + + /** + * Returns information about what was written to the journal + * + * @return A JournalSummary indicating what was written to the journal + * @throws IOException if unable to write to the underlying storage mechanism. + */ + JournalSummary getSummary(); + + /** + * @return true if the journal is healthy and can be written to, false if either the journal has been closed or is poisoned + */ + boolean isHealthy(); + + /** + * Destroys any resources that the journal occupies + */ + void dispose(); +} diff --git a/src/main/java/org/xbib/event/wal/WriteAheadRepository.java b/src/main/java/org/xbib/event/wal/WriteAheadRepository.java new file mode 100644 index 0000000..7b5d41e --- /dev/null +++ b/src/main/java/org/xbib/event/wal/WriteAheadRepository.java @@ -0,0 +1,106 @@ +package org.xbib.event.wal; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +/** + *

+ * A WriteAheadRepository is used to persist state that is otherwise kept + * in-memory. The Repository does not provide any query capability except to + * allow the data to be recovered upon restart of the system. + *

+ * + *

+ * A WriteAheadRepository operates by writing every update to an Edit Log. On + * restart, the data can be recovered by replaying all of the updates that are + * found in the Edit Log. This can, however, eventually result in very large + * Edit Logs, which can both take up massive amounts of disk space and take a + * long time to recover. In order to prevent this, the Repository provides a + * Checkpointing capability. This allows the current in-memory state of the + * Repository to be flushed to disk and the Edit Log to be deleted, thereby + * compacting the amount of space required to store the Repository. After a + * Checkpoint is performed, modifications are again written to an Edit Log. At + * this point, when the system is to be restored, it is restored by first + * loading the Checkpointed version of the Repository and then replaying the + * Edit Log. + *

+ * + *

+ * All implementations of WriteAheadRepository use one or more + * partitions to manage their Edit Logs. An implementation may require exactly + * one partition or may allow many partitions. + *

+ * + * @param the type of Record this repository is for + */ +public interface WriteAheadRepository { + + /** + *

+ * Updates the repository with the specified Records. The Collection must + * not contain multiple records with the same ID + *

+ * + * @param records the records to update + * @param forceSync specifies whether or not the Repository forces the data + * to be flushed to disk. If false, the data may be stored in Operating + * System buffers, which improves performance but could cause loss of data + * if power is lost or the Operating System crashes + * @throws IOException if failure to update repo + * @throws IllegalArgumentException if multiple records within the given + * Collection have the same ID, as specified by {@link SerDe#getRecordIdentifier(Object)} + * method + * + * @return the index of the Partition that performed the update + */ + int update(Collection records, boolean forceSync) throws IOException; + + /** + *

+ * Recovers all records from the persisted state. This method must be called + * before any updates are issued to the Repository. + *

+ * + * @return recovered records + * @throws IOException if failure to read from repo + * @throws IllegalStateException if any updates have been issued against + * this Repository before this method is invoked + */ + Collection recoverRecords() throws IOException; + + /** + *

+ * Recovers all External Swap locations that were persisted. If this method + * is to be called, it must be called AFTER {@link #recoverRecords()} and + * BEFORE {@link #update(Collection, boolean)}}. + *

+ * + * @return swap location + * @throws IOException if failure reading swap locations + */ + Set getRecoveredSwapLocations() throws IOException; + + /** + *

+ * Compacts the contents of the Repository so that rather than having a + * Snapshot and an Edit Log indicating many Updates to the Snapshot, the + * Snapshot is updated to contain the current state of the Repository, and + * the edit log is purged. + *

+ * + * + * @return the number of records that were written to the new snapshot + * @throws java.io.IOException if failure during checkpoint + */ + int checkpoint() throws IOException; + + /** + *

+ * Causes the repository to checkpoint and then close any open resources. + *

+ * + * @throws IOException if failure to shutdown cleanly + */ + void shutdown() throws IOException; +} diff --git a/src/main/java/org/xbib/event/wal/WriteAheadSnapshot.java b/src/main/java/org/xbib/event/wal/WriteAheadSnapshot.java new file mode 100644 index 0000000..ba26082 --- /dev/null +++ b/src/main/java/org/xbib/event/wal/WriteAheadSnapshot.java @@ -0,0 +1,19 @@ +package org.xbib.event.wal; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +public interface WriteAheadSnapshot { + SnapshotCapture prepareSnapshot(long maxTransactionId); + + SnapshotCapture prepareSnapshot(long maxTransactionId, Set swapLocations); + + void writeSnapshot(SnapshotCapture snapshot) throws IOException; + + SnapshotRecovery recover() throws IOException; + + void update(Collection records); + + int getRecordCount(); +}