add event log and event write-ahead-log routines

This commit is contained in:
Jörg Prante 2024-01-05 14:52:29 +01:00
parent 8aa1244d86
commit 64c4045ffa
44 changed files with 3456 additions and 5 deletions

View file

@ -58,3 +58,25 @@ https://github.com/jcustenborder/netty-codec-syslog
as of 30 October, 2023
License: Apache 2.0
----------------
The work in org.xbib.event.wal is based upon org.apache.nifi.wali
https://github.com/apache/nifi/tree/main/nifi-commons/nifi-write-ahead-log
as of 4 January, 2024
License: Apache 2.0
-----------------
The work in org.xbib.event.log is based upon com.vlkan.rfos by Volkan Yazıcı <volkan@yazi.ci>
https://github.com/vy/rotating-fos
as of June, 2021
License: Apache 2.0
-----------------

View file

@ -1,6 +1,3 @@
group = org.xbib
name = event
version = 0.0.9
org.gradle.warning.mode = ALL
org.gradle.daemon = false
version = 0.0.10

View file

@ -18,7 +18,7 @@ dependencyResolutionManagement {
version('gradle', '8.5')
version('datastructures', '5.0.6')
version('netty', '4.1.104.Final')
version('net', '4.0.3')
version('net', '4.0.4')
library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty')
library('net', 'org.xbib', 'net').versionRef('net')
library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures')

View file

@ -6,6 +6,7 @@ module org.xbib.event {
exports org.xbib.event.io;
exports org.xbib.event.io.file;
exports org.xbib.event.io.path;
exports org.xbib.event.log;
exports org.xbib.event.loop.selector;
exports org.xbib.event.loop;
exports org.xbib.event.persistence;
@ -13,6 +14,7 @@ module org.xbib.event {
exports org.xbib.event.syslog;
exports org.xbib.event.timer;
exports org.xbib.event.util;
exports org.xbib.event.wal;
exports org.xbib.event.yield;
exports org.xbib.event;
requires io.netty.buffer;

View file

@ -0,0 +1,48 @@
package org.xbib.event.log;
import java.io.IOException;
import java.io.OutputStream;
public class ByteCountingOutputStream extends OutputStream {
private final OutputStream outputStream;
private long size;
public ByteCountingOutputStream(OutputStream outputStream, long size) {
this.outputStream = outputStream;
this.size = size;
}
public long size() {
return size;
}
@Override
public void write(int b) throws IOException {
outputStream.write(b);
size++;
}
@Override
public void write(byte[] b) throws IOException {
outputStream.write(b);
size += b.length;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
outputStream.write(b, off, len);
size += len;
}
@Override
public void flush() throws IOException {
outputStream.flush();
}
@Override
public void close() throws IOException {
outputStream.close();
}
}

View file

@ -0,0 +1,42 @@
package org.xbib.event.log;
import java.io.IOException;
import java.io.Writer;
public class CharCountingWriter extends Writer {
private final Writer writer;
private long size;
public CharCountingWriter(Writer writer, long size) {
this.writer = writer;
this.size = size;
}
public long size() {
return size;
}
@Override
public void write(int b) throws IOException {
writer.write(b);
size++;
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
writer.write(cbuf, off, len);
size += len;
}
@Override
public void flush() throws IOException {
writer.flush();
}
@Override
public void close() throws IOException {
writer.close();
}
}

View file

@ -0,0 +1,13 @@
package org.xbib.event.log;
import java.time.Instant;
public interface Clock {
Instant now();
Instant midnight();
Instant sundayMidnight();
}

View file

@ -0,0 +1,19 @@
package org.xbib.event.log;
import java.time.Instant;
public class DailyRotationPolicy extends TimeBasedRotationPolicy {
public DailyRotationPolicy() {
}
@Override
protected Instant getTriggerInstant(Clock clock) {
return clock.midnight();
}
@Override
public String toString() {
return "DailyRotationPolicy";
}
}

View file

@ -0,0 +1,47 @@
package org.xbib.event.log;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.Instant;
import java.util.logging.Level;
import java.util.logging.Logger;
public class LoggingRotationListener implements RotationListener {
private static final LoggingRotationListener INSTANCE = new LoggingRotationListener();
private static final Logger logger = Logger.getLogger(LoggingRotationListener.class.getName());
private LoggingRotationListener() {
}
public static LoggingRotationListener getInstance() {
return INSTANCE;
}
@Override
public void onTrigger(RotationPolicy policy, Instant instant) {
logger.log(Level.FINE, MessageFormat.format("rotation trigger policy={0}, instant={1}", policy, instant));
}
@Override
public void onOpen(RotationPolicy policy, Instant instant) {
logger.log(Level.FINE, MessageFormat.format("open policy={0}, instant={1}", policy, instant));
}
@Override
public void onClose(RotationPolicy policy, Instant instant) {
logger.log(Level.FINE, MessageFormat.format("close policy={0}, instant={1}", policy, instant));
}
@Override
public void onSuccess(RotationPolicy policy, Instant instant, Path path) {
logger.log(Level.FINE, MessageFormat.format("rotation success policy={0}, instant={1}, path={2}", policy, instant, path));
}
@Override
public void onFailure(RotationPolicy policy, Instant instant, Path path, Throwable throwable) {
String message = MessageFormat.format("rotation failure policy={0}, instant={1}, file={2}", policy, instant, path);
logger.log(Level.SEVERE, message, throwable);
}
}

View file

@ -0,0 +1,15 @@
package org.xbib.event.log;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
public interface Rotatable {
RotationConfig getConfig();
void rotate(RotationPolicy policy, Instant instant) throws IOException;
void onException(RotationPolicy policy, Instant instant, Path path, Throwable throwable);
}

View file

@ -0,0 +1,170 @@
package org.xbib.event.log;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.text.MessageFormat;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
public class RotatingFileOutputStream extends OutputStream implements Rotatable {
private static final Logger logger = Logger.getLogger(RotatingFileOutputStream.class.getName());
private final RotationConfig config;
private final List<RotationPolicy> writeSensitivePolicies;
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private volatile ByteCountingOutputStream byteCountingOutputStream;
public RotatingFileOutputStream(RotationConfig config) throws IOException {
this(config, createDefaultExecutorService());
}
public RotatingFileOutputStream(RotationConfig config, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) throws IOException {
this.config = config;
this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor;
this.writeSensitivePolicies = config.getPolicies().stream().filter(RotationPolicy::isWriteSensitive).collect(Collectors.toList());
this.byteCountingOutputStream = open(null, config.getClock().now());
startPolicies();
}
@Override
public void rotate(RotationPolicy policy, Instant instant) throws IOException {
Objects.requireNonNull(instant, "instant");
RotationListener listener = config.getListener();
listener.onTrigger(policy, instant);
byteCountingOutputStream.flush();
if (Files.size(config.getPath()) == 0) {
logger.log(Level.FINE, MessageFormat.format("empty file, skipping rotation {path={}}", config.getPath()));
return;
}
listener.onClose(policy, instant);
byteCountingOutputStream.close();
Path rotated = config.getFilePattern().create(instant);
logger.log(Level.FINE, MessageFormat.format("renaming {file={0}, rotatedFile={1}}", config.getPath(), rotated));
Files.move(config.getPath(), rotated);
logger.log(Level.FINE, MessageFormat.format("re-opening file {path={0}}", config.getPath()));
byteCountingOutputStream = open(policy, instant);
if (config.isCompress()) {
compress(policy, instant, rotated, listener);
return;
}
listener.onSuccess(policy, instant, rotated);
}
@Override
public void onException(RotationPolicy policy, Instant instant, Path path, Throwable throwable) {
config.getListener().onFailure(policy, instant, path, throwable);
}
private void compress(RotationPolicy policy, Instant instant, Path rotated, RotationListener listener) {
scheduledThreadPoolExecutor.execute(() -> {
Path compressed = Paths.get(rotated + ".gz");
logger.log(Level.FINE, MessageFormat.format("compressing {rotated={}, compressed={}}", rotated, compressed));
try (InputStream inputStream = Files.newInputStream(rotated);
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(Files.newOutputStream(compressed))) {
inputStream.transferTo(gzipOutputStream);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
listener.onFailure(policy, instant, rotated, e);
}
logger.log(Level.FINE, MessageFormat.format("deleting old file {rotated={}}", rotated));
try {
Files.delete(rotated);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
listener.onFailure(policy, instant, rotated, e);
}
listener.onSuccess(policy, instant, compressed);
});
}
@Override
public RotationConfig getConfig() {
return config;
}
@Override
public synchronized void write(int b) throws IOException {
long byteCount = byteCountingOutputStream.size() + 1;
notifyWriteSensitivePolicies(byteCount);
byteCountingOutputStream.write(b);
}
@Override
public synchronized void write(byte[] b) throws IOException {
long byteCount = byteCountingOutputStream.size() + b.length;
notifyWriteSensitivePolicies(byteCount);
byteCountingOutputStream.write(b);
}
@Override
public synchronized void write(byte[] b, int off, int len) throws IOException {
long byteCount = byteCountingOutputStream.size() + len;
notifyWriteSensitivePolicies(byteCount);
byteCountingOutputStream.write(b, off, len);
}
@Override
public synchronized void flush() throws IOException {
byteCountingOutputStream.flush();
}
@Override
public synchronized void close() throws IOException {
config.getListener().onClose(null, config.getClock().now());
if (byteCountingOutputStream != null) {
byteCountingOutputStream.close();
byteCountingOutputStream = null;
}
}
private static ScheduledThreadPoolExecutor createDefaultExecutorService() {
final AtomicInteger threadCount = new AtomicInteger();
return new ScheduledThreadPoolExecutor(4, runnable -> {
Thread thread = new Thread(runnable, "file-rotation-" + threadCount.getAndIncrement());
thread.setDaemon(true);
return thread;
});
}
private void startPolicies() {
for (RotationPolicy policy : config.getPolicies()) {
policy.start(this);
}
}
private ByteCountingOutputStream open(RotationPolicy policy, Instant instant) throws IOException {
OutputStream fileOutputStream = config.isAppend() && Files.exists(config.getPath()) ?
Files.newOutputStream(config.getPath(), StandardOpenOption.APPEND) :
Files.newOutputStream(config.getPath());
config.getListener().onOpen(policy, instant);
long size = config.isAppend() ? Files.size(config.getPath()) : 0;
return new ByteCountingOutputStream(fileOutputStream, size);
}
private void notifyWriteSensitivePolicies(long byteCount) {
for (RotationPolicy writeSensitivePolicy : writeSensitivePolicies) {
writeSensitivePolicy.acceptWrite(byteCount);
}
}
@Override
public String toString() {
return "RotatingFileOutputStream " + config.getPath();
}
}

View file

@ -0,0 +1,252 @@
package org.xbib.event.log;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.TimeZone;
public class RotatingFilePattern {
private static final char ESCAPE_CHAR = '%';
private static final char DATE_TIME_DIRECTIVE_CHAR = 'd';
private static final char DATE_TIME_BLOCK_START_CHAR = '{';
private static final char DATE_TIME_BLOCK_END_CHAR = '}';
private interface Field {
void render(StringBuilder builder, Instant instant);
}
private static class TextField implements Field {
private final String text;
private TextField(String text) {
this.text = text;
}
@Override
public void render(StringBuilder builder, Instant ignored) {
builder.append(text);
}
}
private static class DateTimeField implements Field {
private final DateTimeFormatter dateTimeFormatter;
private DateTimeField(DateTimeFormatter dateTimeFormatter) {
this.dateTimeFormatter = dateTimeFormatter;
}
@Override
public void render(StringBuilder builder, Instant instant) {
String formattedDateTime = dateTimeFormatter.format(instant);
builder.append(formattedDateTime);
}
}
private final String pattern;
private final Locale locale;
private final ZoneId timeZoneId;
private final List<Field> fields;
private RotatingFilePattern(Builder builder) {
this.pattern = builder.pattern;
this.locale = builder.locale;
this.timeZoneId = builder.timeZoneId;
this.fields = readPattern(pattern, locale, timeZoneId);
}
private static List<Field> readPattern(String pattern, Locale locale, ZoneId timeZoneId) {
List<Field> fields = new LinkedList<>();
StringBuilder textBuilder = new StringBuilder();
int totalCharCount = pattern.length();
boolean foundDateTimeDirective = false;
for (int charIndex = 0; charIndex < totalCharCount;) {
char c0 = pattern.charAt(charIndex);
if (c0 == ESCAPE_CHAR) {
// Check if escape character is escaped.
boolean hasOneMoreChar = (totalCharCount - charIndex - 1) > 0;
if (hasOneMoreChar) {
char c1 = pattern.charAt(charIndex + 1);
if (c1 == ESCAPE_CHAR) {
textBuilder.append(c1);
charIndex += 2;
continue;
}
}
// Append collected text so far, if there is any.
if (textBuilder.length() > 0) {
String text = textBuilder.toString();
TextField field = new TextField(text);
fields.add(field);
textBuilder = new StringBuilder();
}
// Try to read the directive.
boolean hasSufficientDateTimeChars = (totalCharCount - charIndex - 3) > 0;
if (hasSufficientDateTimeChars) {
char c1 = pattern.charAt(charIndex + 1);
if (c1 == DATE_TIME_DIRECTIVE_CHAR) {
int blockStartIndex = charIndex + 2;
char c2 = pattern.charAt(blockStartIndex);
if (c2 == DATE_TIME_BLOCK_START_CHAR) {
int blockEndIndex = pattern.indexOf(DATE_TIME_BLOCK_END_CHAR, blockStartIndex + 1);
if (blockEndIndex >= 0) {
String dateTimePattern = pattern.substring(blockStartIndex + 1, blockEndIndex);
DateTimeFormatter dateTimeFormatter;
try {
dateTimeFormatter = DateTimeFormatter
.ofPattern(dateTimePattern)
.withLocale(locale)
.withZone(timeZoneId);
} catch (Exception error) {
String message = String.format(
"invalid date time pattern (position=%d, pattern=%s, dateTimePattern=%s)",
charIndex, pattern, dateTimePattern);
throw new RotatingFilePatternException(message, error);
}
DateTimeField dateTimeField = new DateTimeField(dateTimeFormatter);
fields.add(dateTimeField);
foundDateTimeDirective = true;
charIndex = blockEndIndex + 1;
continue;
}
}
}
}
// Escape character leads to a dead end.
String message = String.format("invalid escape character (position=%d, pattern=%s)", charIndex, pattern);
throw new RotatingFilePatternException(message);
} else {
textBuilder.append(c0);
charIndex += 1;
}
}
// Append collected text so far, if there is any.
if (textBuilder.length() > 0) {
String text = textBuilder.toString();
TextField field = new TextField(text);
fields.add(field);
}
// Bail out if could not locate any date time directives.
if (!foundDateTimeDirective) {
String message = String.format("missing date time directive (pattern=%s)", pattern);
throw new RotatingFilePatternException(message);
}
// Return collected fields so far.
return fields;
}
public Path create(Instant instant) {
StringBuilder pathNameBuilder = new StringBuilder();
for (Field field : fields) {
field.render(pathNameBuilder, instant);
}
String pathName = pathNameBuilder.toString();
return Paths.get(pathName);
}
public String getPattern() {
return pattern;
}
public Locale getLocale() {
return locale;
}
public ZoneId getTimeZoneId() {
return timeZoneId;
}
@Override
public boolean equals(Object instance) {
if (this == instance) return true;
if (instance == null || getClass() != instance.getClass()) return false;
RotatingFilePattern that = (RotatingFilePattern) instance;
return Objects.equals(pattern, that.pattern) &&
Objects.equals(locale, that.locale) &&
Objects.equals(timeZoneId, that.timeZoneId);
}
@Override
public int hashCode() {
return Objects.hash(pattern, locale, timeZoneId);
}
@Override
public String toString() {
return String.format("RotatingFilePattern{pattern=%s, locale=%s, timeZoneId=%s}", pattern, locale, timeZoneId);
}
public static Builder builder() {
return new Builder();
}
public static final class Builder {
private String pattern;
private Locale locale = Locale.getDefault();
private ZoneId timeZoneId = TimeZone.getDefault().toZoneId();
private Builder() {}
public Builder pattern(String pattern) {
this.pattern = pattern;
return this;
}
public Builder locale(Locale locale) {
this.locale = locale;
return this;
}
public Builder timeZoneId(ZoneId timeZoneId) {
this.timeZoneId = timeZoneId;
return this;
}
public RotatingFilePattern build() {
validate();
return new RotatingFilePattern(this);
}
private void validate() {
Objects.requireNonNull(pattern, "file");
Objects.requireNonNull(locale, "locale");
Objects.requireNonNull(timeZoneId, "timeZoneId");
}
}
}

View file

@ -0,0 +1,14 @@
package org.xbib.event.log;
@SuppressWarnings("serial")
public class RotatingFilePatternException extends IllegalArgumentException {
public RotatingFilePatternException(String message) {
super(message);
}
public RotatingFilePatternException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -0,0 +1,149 @@
package org.xbib.event.log;
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.text.MessageFormat;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
public class RotatingFileWriter extends Writer implements Rotatable {
private static final Logger logger = Logger.getLogger(RotatingFileWriter.class.getName());
private final RotationConfig config;
private final List<RotationPolicy> writeSensitivePolicies;
private volatile CharCountingWriter charCountingWriter;
public RotatingFileWriter(RotationConfig config) throws IOException {
this.config = config;
Objects.requireNonNull(config.getScheduler());
this.writeSensitivePolicies = config.getPolicies().stream().filter(RotationPolicy::isWriteSensitive).collect(Collectors.toList());
this.charCountingWriter = open(null, config.getClock().now());
startPolicies();
}
@Override
public void rotate(RotationPolicy policy, Instant instant) throws IOException {
Objects.requireNonNull(instant, "instant");
RotationListener listener = config.getListener();
listener.onTrigger(policy, instant);
charCountingWriter.flush();
if (Files.size(config.getPath()) == 0) {
logger.log(Level.FINE, MessageFormat.format("empty file, skipping rotation {path={}}", config.getPath()));
return;
}
listener.onClose(policy, instant);
charCountingWriter.close();
Path rotated = config.getFilePattern().create(instant);
logger.log(Level.FINE, MessageFormat.format("renaming {file={0}, rotated={1}}", config.getPath(), rotated));
Files.move(config.getPath(), rotated);
logger.log(Level.FINE, MessageFormat.format("re-opening {path={0}}", config.getPath()));
charCountingWriter = open(policy, instant);
if (config.isCompress()) {
compress(policy, instant, rotated, listener);
return;
}
listener.onSuccess(policy, instant, rotated);
}
@Override
public void onException(RotationPolicy policy, Instant instant, Path path, Throwable throwable) {
config.getListener().onFailure(policy, instant, path, throwable);
}
private void compress(RotationPolicy policy, Instant instant, Path rotated, RotationListener listener) {
config.getScheduler().execute(() -> {
Path compressed = Paths.get(rotated + ".gz");
logger.log(Level.FINE, MessageFormat.format("compressing {rotated={}, compressed={}}", rotated, compressed));
try (InputStream inputStream = Files.newInputStream(rotated);
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(Files.newOutputStream(compressed))) {
inputStream.transferTo(gzipOutputStream);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
listener.onFailure(policy, instant, compressed, e);
return;
}
logger.log(Level.FINE, MessageFormat.format("deleting old file {rotated={}}", rotated));
try {
Files.delete(rotated);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
listener.onFailure(policy, instant, compressed, e);
return;
}
listener.onSuccess(policy, instant, compressed);
});
}
@Override
public RotationConfig getConfig() {
return config;
}
@Override
public synchronized void write(int b) throws IOException {
long byteCount = charCountingWriter.size() + 1;
notifyWriteSensitivePolicies(byteCount);
charCountingWriter.write(b);
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
long byteCount = charCountingWriter.size() + len;
notifyWriteSensitivePolicies(byteCount);
charCountingWriter.write(cbuf, off, len);
}
@Override
public synchronized void flush() throws IOException {
charCountingWriter.flush();
}
@Override
public synchronized void close() throws IOException {
config.getListener().onClose(null, config.getClock().now());
if (charCountingWriter != null) {
charCountingWriter.close();
charCountingWriter = null;
}
}
private void startPolicies() {
for (RotationPolicy policy : config.getPolicies()) {
policy.start(this);
}
}
private CharCountingWriter open(RotationPolicy policy, Instant instant) throws IOException {
Writer writer = config.isAppend() && Files.exists(config.getPath())?
Files.newBufferedWriter(config.getPath(), StandardOpenOption.APPEND) :
Files.newBufferedWriter(config.getPath());
config.getListener().onOpen(policy, instant);
long size = config.isAppend() ? Files.size(config.getPath()) : 0;
return new CharCountingWriter(writer, size);
}
private void notifyWriteSensitivePolicies(long byteCount) {
for (RotationPolicy writeSensitivePolicy : writeSensitivePolicies) {
writeSensitivePolicy.acceptWrite(byteCount);
}
}
@Override
public String toString() {
return "RotatingFileOutputStream " + config.getPath();
}
}

View file

@ -0,0 +1,197 @@
package org.xbib.event.log;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
public class RotationConfig {
private final Path path;
private final RotatingFilePattern filePattern;
private final ScheduledExecutorService executorService;
private final Collection<RotationPolicy> policies;
private final boolean append;
private final boolean compress;
private final Clock clock;
private final RotationListener rotationListener;
private RotationConfig(Builder builder) {
this.path = builder.path;
this.filePattern = builder.filePattern;
this.executorService = builder.executorService;
this.policies = builder.policies;
this.append = builder.append;
this.compress = builder.compress;
this.clock = builder.clock;
this.rotationListener = builder.listener;
}
public Path getPath() {
return path;
}
public RotatingFilePattern getFilePattern() {
return filePattern;
}
public Collection<RotationPolicy> getPolicies() {
return policies;
}
public boolean isAppend() {
return append;
}
public boolean isCompress() {
return compress;
}
public Clock getClock() {
return clock;
}
public RotationListener getListener() {
return rotationListener;
}
public ScheduledExecutorService getScheduler() {
return executorService;
}
@Override
public boolean equals(Object instance) {
if (this == instance) return true;
if (instance == null || getClass() != instance.getClass()) return false;
RotationConfig that = (RotationConfig) instance;
return append == that.append &&
compress == that.compress &&
Objects.equals(path, that.path) &&
Objects.equals(filePattern, that.filePattern) &&
Objects.equals(executorService, that.executorService) &&
Objects.equals(policies, that.policies) &&
Objects.equals(clock, that.clock) &&
Objects.equals(rotationListener, that.rotationListener);
}
@Override
public int hashCode() {
return Objects.hash(path, filePattern, executorService, policies, append, compress, clock, rotationListener);
}
@Override
public String toString() {
return String.format("RotationConfig{path=%s}", path);
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private Path path;
private RotatingFilePattern filePattern;
private ScheduledExecutorService executorService;
private Collection<RotationPolicy> policies;
private boolean append;
private boolean compress;
private Clock clock;
private RotationListener listener;
private Builder() {
this.append = true;
this.clock = SystemClock.getInstance();
this.listener = LoggingRotationListener.getInstance();
}
public Builder path(Path path) {
this.path = path;
return this;
}
public Builder path(String fileName) {
this.path = Paths.get(fileName);
return this;
}
public Builder filePattern(RotatingFilePattern filePattern) {
this.filePattern = filePattern;
return this;
}
public Builder filePattern(String filePattern) {
this.filePattern = RotatingFilePattern.builder().pattern(filePattern).build();
return this;
}
public Builder executorService(ScheduledExecutorService executorService) {
this.executorService = executorService;
return this;
}
public Builder policies(Collection<RotationPolicy> policies) {
this.policies = policies;
return this;
}
public Builder policy(RotationPolicy policy) {
if (policies == null) {
policies = new LinkedHashSet<>();
}
policies.add(policy);
return this;
}
public Builder append(boolean append) {
this.append = append;
return this;
}
public Builder compress(boolean compress) {
this.compress = compress;
return this;
}
public Builder clock(Clock clock) {
this.clock = clock;
return this;
}
public Builder listener(RotationListener listener) {
this.listener = listener;
return this;
}
public RotationConfig build() {
validate();
return new RotationConfig(this);
}
private void validate() {
Objects.requireNonNull(path, "path");
Objects.requireNonNull(filePattern, "filePattern");
if (policies == null || policies.isEmpty()) {
throw new IllegalArgumentException("empty policies");
}
Objects.requireNonNull(clock, "clock");
Objects.requireNonNull(listener, "callback");
}
}
}

View file

@ -0,0 +1,18 @@
package org.xbib.event.log;
import java.nio.file.Path;
import java.time.Instant;
public interface RotationListener {
void onTrigger(RotationPolicy policy, Instant instant);
void onOpen(RotationPolicy policy, Instant instant);
void onClose(RotationPolicy policy, Instant instant);
void onSuccess(RotationPolicy policy, Instant instant, Path path);
void onFailure(RotationPolicy policy, Instant instant, Path path, Throwable throwable);
}

View file

@ -0,0 +1,10 @@
package org.xbib.event.log;
public interface RotationPolicy {
void start(Rotatable rotatable);
boolean isWriteSensitive();
void acceptWrite(long byteCount);
}

View file

@ -0,0 +1,69 @@
package org.xbib.event.log;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
public class SizeBasedRotationPolicy implements RotationPolicy {
private static final Logger logger = Logger.getLogger(SizeBasedRotationPolicy.class.getName());
private final long maxByteCount;
private Rotatable rotatable;
public SizeBasedRotationPolicy(long maxByteCount) {
if (maxByteCount < 1) {
String message = String.format("invalid size {maxByteCount=%d}", maxByteCount);
throw new IllegalArgumentException(message);
}
this.maxByteCount = maxByteCount;
}
public long getMaxByteCount() {
return maxByteCount;
}
@Override
public boolean isWriteSensitive() {
return true;
}
@Override
public void acceptWrite(long byteCount) {
if (byteCount > maxByteCount) {
Instant instant = rotatable.getConfig().getClock().now();
try {
rotatable.rotate(this, instant);
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
rotatable.onException(this, instant, rotatable.getConfig().getPath(), e);
}
}
}
@Override
public void start(Rotatable rotatable) {
this.rotatable = rotatable;
}
@Override
public boolean equals(Object instance) {
if (this == instance) return true;
if (instance == null || getClass() != instance.getClass()) return false;
SizeBasedRotationPolicy that = (SizeBasedRotationPolicy) instance;
return maxByteCount == that.maxByteCount;
}
@Override
public int hashCode() {
return Objects.hash(maxByteCount);
}
@Override
public String toString() {
return String.format("SizeBasedRotationPolicy{maxByteCount=%d}", maxByteCount);
}
}

View file

@ -0,0 +1,48 @@
package org.xbib.event.log;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
public class SystemClock implements Clock {
private static final SystemClock INSTANCE = new SystemClock();
public SystemClock() {
}
public static SystemClock getInstance() {
return INSTANCE;
}
@Override
public Instant now() {
return Instant.now();
}
@Override
public Instant midnight() {
Instant instant = now();
ZonedDateTime utcInstant = instant.atZone(ZoneId.of("UTC"));
return LocalDate
.from(utcInstant)
.atStartOfDay()
.plusDays(1)
.toInstant(ZoneOffset.UTC);
}
@Override
public Instant sundayMidnight() {
Instant instant = now();
ZonedDateTime utcInstant = instant.atZone(ZoneId.of("UTC"));
LocalDateTime todayStart = LocalDate.from(utcInstant).atStartOfDay();
int todayIndex = todayStart.getDayOfWeek().getValue() - 1;
int sundayOffset = 7 - todayIndex;
return todayStart
.plusDays(sundayOffset)
.toInstant(ZoneOffset.UTC);
}
}

View file

@ -0,0 +1,46 @@
package org.xbib.event.log;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public abstract class TimeBasedRotationPolicy implements RotationPolicy {
private static final Logger logger = Logger.getLogger(TimeBasedRotationPolicy.class.getName());
public TimeBasedRotationPolicy() {
}
@Override
public boolean isWriteSensitive() {
return false;
}
@Override
public void acceptWrite(long byteCount) {
throw new UnsupportedOperationException();
}
@Override
public void start(Rotatable rotatable) {
RotationConfig config = rotatable.getConfig();
Clock clock = config.getClock();
Instant currentInstant = clock.now();
Instant triggerInstant = getTriggerInstant(clock);
long triggerDelayMillis = Duration.between(currentInstant, triggerInstant).toMillis();
Runnable task = () -> {
try {
rotatable.rotate(TimeBasedRotationPolicy.this, triggerInstant);
start(rotatable);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
rotatable.onException(this, currentInstant, rotatable.getConfig().getPath(), e);
}
};
config.getScheduler().schedule(task, triggerDelayMillis, TimeUnit.MILLISECONDS);
}
protected abstract Instant getTriggerInstant(Clock clock);
}

View file

@ -0,0 +1,19 @@
package org.xbib.event.log;
import java.time.Instant;
public class WeeklyRotationPolicy extends TimeBasedRotationPolicy {
public WeeklyRotationPolicy() {
}
@Override
public Instant getTriggerInstant(Clock clock) {
return clock.sundayMidnight();
}
@Override
public String toString() {
return "WeeklyRotationPolicy";
}
}

View file

@ -0,0 +1,39 @@
package org.xbib.event.wal;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
public class BlockingQueuePool<T> implements ObjectPool<T> {
private final BlockingQueue<T> queue;
private final Supplier<T> creationFunction;
private final Predicate<T> reuseCheck;
private final Consumer<T> returnPreparation;
public BlockingQueuePool(final int maxSize, final Supplier<T> creationFunction, final Predicate<T> reuseCheck, final Consumer<T> returnPreparation) {
this.queue = new LinkedBlockingQueue<>(maxSize);
this.creationFunction = creationFunction;
this.reuseCheck = reuseCheck;
this.returnPreparation = returnPreparation;
}
@Override
public T borrowObject() {
final T existing = queue.poll();
if (existing != null) {
return existing;
}
return creationFunction.get();
}
@Override
public void returnObject(final T somethingBorrowed) {
if (reuseCheck.test(somethingBorrowed)) {
returnPreparation.accept(somethingBorrowed);
queue.offer(somethingBorrowed);
}
}
}

View file

@ -0,0 +1,28 @@
package org.xbib.event.wal;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
/**
* A wrapper around a DataOutputStream, which wraps a ByteArrayOutputStream.
* This allows us to obtain the DataOutputStream itself so that we can perform
* writeXYZ methods and also allows us to obtain the underlying ByteArrayOutputStream
* for performing methods such as size(), reset(), writeTo()
*/
public class ByteArrayDataOutputStream {
private final ByteArrayOutputStream baos;
private final DataOutputStream dos;
public ByteArrayDataOutputStream(final int initialBufferSize) {
this.baos = new ByteArrayOutputStream(initialBufferSize);
this.dos = new DataOutputStream(baos);
}
public DataOutputStream getDataOutputStream() {
return dos;
}
public ByteArrayOutputStream getByteArrayOutputStream() {
return baos;
}
}

View file

@ -0,0 +1,104 @@
package org.xbib.event.wal;
import java.io.IOException;
import java.io.InputStream;
public class ByteCountingInputStream extends InputStream {
private final InputStream in;
private long bytesRead = 0L;
private long bytesSkipped = 0L;
private long bytesReadSinceMark = 0L;
private long bytesSkippedSinceMark = 0L;
public ByteCountingInputStream(final InputStream in) {
this.in = in;
}
public ByteCountingInputStream(final InputStream in, final long initialOffset) {
this.in = in;
this.bytesSkipped = initialOffset;
}
@Override
public int read() throws IOException {
final int fromSuper = in.read();
if (fromSuper >= 0) {
bytesRead++;
bytesReadSinceMark++;
}
return fromSuper;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
final int fromSuper = in.read(b, off, len);
if (fromSuper >= 0) {
bytesRead += fromSuper;
bytesReadSinceMark += fromSuper;
}
return fromSuper;
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public long skip(final long n) throws IOException {
final long skipped = in.skip(n);
if (skipped >= 0) {
bytesSkipped += skipped;
bytesSkippedSinceMark += skipped;
}
return skipped;
}
public long getBytesRead() {
return bytesRead;
}
public long getBytesSkipped() {
return bytesSkipped;
}
public long getBytesConsumed() {
return getBytesRead() + getBytesSkipped();
}
@Override
public void mark(final int readlimit) {
in.mark(readlimit);
bytesReadSinceMark = 0L;
bytesSkippedSinceMark = 0L;
}
@Override
public boolean markSupported() {
return in.markSupported();
}
@Override
public void reset() throws IOException {
in.reset();
bytesRead -= bytesReadSinceMark;
bytesSkipped -= bytesSkippedSinceMark;
bytesReadSinceMark = 0L;
bytesSkippedSinceMark = 0L;
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public int available() throws IOException {
return in.available();
}
}

View file

@ -0,0 +1,357 @@
package org.xbib.event.wal;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T> {
private static final Logger logger = Logger.getLogger(HashMapSnapshot.class.getName());
private static final int ENCODING_VERSION = 1;
private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>();
private final SerDeFactory<T> serdeFactory;
private final Set<String> swapLocations = Collections.synchronizedSet(new HashSet<>());
private final File storageDirectory;
public HashMapSnapshot(final File storageDirectory, final SerDeFactory<T> serdeFactory) {
this.serdeFactory = serdeFactory;
this.storageDirectory = storageDirectory;
}
private SnapshotHeader validateHeader(final DataInputStream dataIn) throws IOException {
final String snapshotClass = dataIn.readUTF();
logger.log(Level.FINE, () -> MessageFormat.format("Snapshot Class Name for {0} is {1}",
storageDirectory, snapshotClass));
if (!snapshotClass.equals(HashMapSnapshot.class.getName())) {
throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using the "
+ snapshotClass + " class; cannot restore using " + getClass().getName());
}
final int snapshotVersion = dataIn.readInt();
logger.log(Level.FINE, () -> MessageFormat.format("Snapshot version for {0} is {1}",
storageDirectory, snapshotVersion));
if (snapshotVersion > getVersion()) {
throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using version "
+ snapshotVersion + " of the " + snapshotClass + " class; cannot restore using Version " + getVersion());
}
final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now
logger.log(Level.FINE, () -> MessageFormat.format("Serde encoding for Snapshot at {0} is {1}",
storageDirectory, serdeEncoding));
final int serdeVersion = dataIn.readInt();
logger.log(Level.FINE, () -> MessageFormat.format("Serde version for Snapshot at {0} is {1}",
storageDirectory, serdeVersion));
final long maxTransactionId = dataIn.readLong();
logger.log(Level.FINE, () -> MessageFormat.format("Max Transaction ID for Snapshot at {0} is {1}",
storageDirectory, maxTransactionId));
final int numRecords = dataIn.readInt();
logger.log(Level.FINE, () -> MessageFormat.format("Number of Records for Snapshot at {0} is {1}",
storageDirectory, numRecords));
final SerDe<T> serde = serdeFactory.createSerDe(serdeEncoding);
serde.readHeader(dataIn);
return new SnapshotHeader(serde, serdeVersion, maxTransactionId, numRecords);
}
@Override
public SnapshotRecovery<T> recover() throws IOException {
final File partialFile = getPartialFile();
final File snapshotFile = getSnapshotFile();
final boolean partialExists = partialFile.exists();
final boolean snapshotExists = snapshotFile.exists();
// If there is no snapshot (which is the case before the first snapshot is ever created), then just
// return an empty recovery.
if (!partialExists && !snapshotExists) {
return SnapshotRecovery.emptyRecovery();
}
if (partialExists && snapshotExists) {
// both files exist -- assume NiFi crashed/died while checkpointing. Delete the partial file.
Files.delete(partialFile.toPath());
} else if (partialExists) {
// partial exists but snapshot does not -- we must have completed
// creating the partial, deleted the snapshot
// but crashed before renaming the partial to the snapshot. Just
// rename partial to snapshot
Files.move(partialFile.toPath(), snapshotFile.toPath());
}
if (snapshotFile.length() == 0) {
logger.log(Level.WARNING, () -> MessageFormat.format("{0} Found 0-byte Snapshot file; skipping Snapshot file in recovery",
this));
return SnapshotRecovery.emptyRecovery();
}
// At this point, we know the snapshotPath exists because if it didn't, then we either returned null
// or we renamed partialPath to snapshotPath. So just Recover from snapshotPath.
try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(new FileInputStream(snapshotFile)))) {
// Ensure that the header contains the information that we expect and retrieve the relevant information from the header.
final SnapshotHeader header = validateHeader(dataIn);
final SerDe<T> serde = header.getSerDe();
final int serdeVersion = header.getSerDeVersion();
final int numRecords = header.getNumRecords();
final long maxTransactionId = header.getMaxTransactionId();
// Read all of the records that we expect to receive.
for (int i = 0; i < numRecords; i++) {
final T record = serde.deserializeRecord(dataIn, serdeVersion);
if (record == null) {
throw new EOFException();
}
final UpdateType updateType = serde.getUpdateType(record);
if (updateType == UpdateType.DELETE) {
logger.log(Level.WARNING, () -> "While recovering from snapshot, found record with type 'DELETE'; this record will not be restored");
continue;
}
logger.log(Level.FINEST, () -> MessageFormat.format("Recovered from snapshot: {0}", record));
recordMap.put(serde.getRecordIdentifier(record), record);
}
// Determine the location of any swap files.
final int numSwapRecords = dataIn.readInt();
final Set<String> swapLocations = new HashSet<>();
for (int i = 0; i < numSwapRecords; i++) {
swapLocations.add(dataIn.readUTF());
}
this.swapLocations.addAll(swapLocations);
logger.log(Level.INFO, () -> MessageFormat.format("{0} restored {1} Records and {2} Swap Files from Snapshot, ending with Transaction ID {3}",
this, numRecords, swapLocations.size(), maxTransactionId));
return new StandardSnapshotRecovery<>(recordMap, swapLocations, snapshotFile, maxTransactionId);
}
}
@Override
public void update(final Collection<T> records) {
// This implementation of Snapshot keeps a ConcurrentHashMap of all 'active' records
// (meaning records that have not been removed and are not swapped out), keyed by the
// Record Identifier. It keeps only the most up-to-date version of the Record. This allows
// us to write the snapshot very quickly without having to re-process the journal files.
// For each update, then, we will update the record in the map.
for (final T record : records) {
final Object recordId = serdeFactory.getRecordIdentifier(record);
final UpdateType updateType = serdeFactory.getUpdateType(record);
switch (updateType) {
case DELETE:
recordMap.remove(recordId);
break;
case SWAP_OUT:
final String location = serdeFactory.getLocation(record);
if (location == null) {
logger.log(Level.INFO, () -> "Received Record (ID=" + recordId + ") with UpdateType of SWAP_OUT but "
+ "no indicator of where the Record is to be Swapped Out to; these records may be "
+ "lost when the repository is restored!");
} else {
recordMap.remove(recordId);
this.swapLocations.add(location);
}
break;
case SWAP_IN:
final String swapLocation = serdeFactory.getLocation(record);
if (swapLocation == null) {
logger.log(Level.SEVERE, () -> "Received Record (ID=" + recordId + ") with UpdateType of SWAP_IN but no "
+ "indicator of where the Record is to be Swapped In from; these records may be duplicated "
+ "when the repository is restored!");
} else {
swapLocations.remove(swapLocation);
}
recordMap.put(recordId, record);
break;
default:
recordMap.put(recordId, record);
break;
}
}
}
@Override
public int getRecordCount() {
return recordMap.size();
}
@Override
public T lookup(final Object recordId) {
return recordMap.get(recordId);
}
@Override
public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) {
return prepareSnapshot(maxTransactionId, this.swapLocations);
}
@Override
public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId, final Set<String> swapFileLocations) {
return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapFileLocations), maxTransactionId);
}
private int getVersion() {
return ENCODING_VERSION;
}
private File getPartialFile() {
return new File(storageDirectory, "checkpoint.partial");
}
private File getSnapshotFile() {
return new File(storageDirectory, "checkpoint");
}
@Override
public synchronized void writeSnapshot(final SnapshotCapture<T> snapshot) throws IOException {
final SerDe<T> serde = serdeFactory.createSerDe(null);
final File snapshotFile = getSnapshotFile();
final File partialFile = getPartialFile();
// We must ensure that we do not overwrite the existing Snapshot file directly because if NiFi were
// to be killed or crash when we are partially finished, we'd end up with no viable Snapshot file at all.
// To avoid this, we write to a 'partial' file, then delete the existing Snapshot file, if it exists, and
// rename 'partial' to Snaphsot. That way, if NiFi crashes, we can still restore the Snapshot by first looking
// for a Snapshot file and restoring it, if it exists. If it does not exist, then we restore from the partial file,
// assuming that NiFi crashed after deleting the Snapshot file and before renaming the partial file.
//
// If there is no Snapshot file currently but there is a Partial File, then this indicates
// that we have deleted the Snapshot file and failed to rename the Partial File. We don't want
// to overwrite the Partial file, because doing so could potentially lose data. Instead, we must
// first rename it to Snapshot and then write to the partial file.
if (!snapshotFile.exists() && partialFile.exists()) {
final boolean rename = partialFile.renameTo(snapshotFile);
if (!rename) {
throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile);
}
}
// Write to the partial file.
try (final FileOutputStream fileOut = new FileOutputStream(getPartialFile());
final OutputStream bufferedOut = new BufferedOutputStream(fileOut);
final DataOutputStream dataOut = new DataOutputStream(bufferedOut)) {
// Write out the header
dataOut.writeUTF(HashMapSnapshot.class.getName());
dataOut.writeInt(getVersion());
dataOut.writeUTF(serde.getClass().getName());
dataOut.writeInt(serde.getVersion());
dataOut.writeLong(snapshot.getMaxTransactionId());
dataOut.writeInt(snapshot.getRecords().size());
serde.writeHeader(dataOut);
// Serialize each record
for (final T record : snapshot.getRecords().values()) {
logger.log(Level.FINEST, () -> MessageFormat.format("Checkpointing {0}", record));
serde.serializeRecord(record, dataOut);
}
// Write out the number of swap locations, followed by the swap locations themselves.
dataOut.writeInt(snapshot.getSwapLocations().size());
for (final String swapLocation : snapshot.getSwapLocations()) {
dataOut.writeUTF(swapLocation);
}
// Ensure that we flush the Buffered Output Stream and then perform an fsync().
// This ensures that the data is fully written to disk before we delete the existing snapshot.
dataOut.flush();
fileOut.getChannel().force(false);
}
// If the snapshot file exists, delete it
if (snapshotFile.exists()) {
if (!snapshotFile.delete()) {
logger.log(Level.WARNING, () -> "Unable to delete existing Snapshot file " + snapshotFile);
}
}
// Rename the partial file to Snapshot.
final boolean rename = partialFile.renameTo(snapshotFile);
if (!rename) {
throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile);
}
}
public class Snapshot implements SnapshotCapture<T> {
private final Map<Object, T> records;
private final long maxTransactionId;
private final Set<String> swapLocations;
public Snapshot(final Map<Object, T> records, final Set<String> swapLocations, final long maxTransactionId) {
this.records = records;
this.swapLocations = swapLocations;
this.maxTransactionId = maxTransactionId;
}
@Override
public final Map<Object, T> getRecords() {
return records;
}
@Override
public long getMaxTransactionId() {
return maxTransactionId;
}
@Override
public Set<String> getSwapLocations() {
return swapLocations;
}
}
private class SnapshotHeader {
private final SerDe<T> serde;
private final int serdeVersion;
private final int numRecords;
private final long maxTransactionId;
public SnapshotHeader(final SerDe<T> serde, final int serdeVersion, final long maxTransactionId, final int numRecords) {
this.serde = serde;
this.serdeVersion = serdeVersion;
this.maxTransactionId = maxTransactionId;
this.numRecords = numRecords;
}
public SerDe<T> getSerDe() {
return serde;
}
public int getSerDeVersion() {
return serdeVersion;
}
public long getMaxTransactionId() {
return maxTransactionId;
}
public int getNumRecords() {
return numRecords;
}
}
}

View file

@ -0,0 +1,10 @@
package org.xbib.event.wal;
public interface JournalRecovery {
int getUpdateCount();
long getMaxTransactionId();
boolean isEOFExceptionEncountered();
}

View file

@ -0,0 +1,18 @@
package org.xbib.event.wal;
public interface JournalSummary {
/**
* @return the Transaction ID of the first transaction written
*/
long getFirstTransactionId();
/**
* @return the Transaction ID of the last transaction written
*/
long getLastTransactionId();
/**
* @return the number of transactions that were written to the journal
*/
int getTransactionCount();
}

View file

@ -0,0 +1,607 @@
package org.xbib.event.wal;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.DecimalFormat;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
private static final Logger logger = Logger.getLogger(LengthDelimitedJournal.class.getName());
private static final int DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES = 5 * 1024 * 1024; // 5 MB
private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0);
private static final int JOURNAL_ENCODING_VERSION = 1;
private static final byte TRANSACTION_FOLLOWS = 64;
private static final byte JOURNAL_COMPLETE = 127;
private static final int NUL_BYTE = 0;
private final File journalFile;
private final File overflowDirectory;
private final long initialTransactionId;
private final SerDeFactory<T> serdeFactory;
private final ObjectPool<ByteArrayDataOutputStream> streamPool;
private final int maxInHeapSerializationBytes;
private SerDe<T> serde;
private FileOutputStream fileOut;
private BufferedOutputStream bufferedOut;
private long currentTransactionId;
private int transactionCount;
private boolean headerWritten = false;
private volatile Throwable poisonCause = null;
private volatile boolean closed = false;
private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // guarded by synchronized block
public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId) {
this(journalFile, serdeFactory, streamPool, initialTransactionId, DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES);
}
public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId,
final int maxInHeapSerializationBytes) {
this.journalFile = journalFile;
this.overflowDirectory = new File(journalFile.getParentFile(), "overflow-" + getBaseFilename(journalFile));
this.serdeFactory = serdeFactory;
this.serde = serdeFactory.createSerDe(null);
this.streamPool = streamPool;
this.initialTransactionId = initialTransactionId;
this.currentTransactionId = initialTransactionId;
this.maxInHeapSerializationBytes = maxInHeapSerializationBytes;
}
public void dispose() {
logger.log(Level.FINE, () -> MessageFormat.format("Deleting Journal {0} because it is now encapsulated in the latest Snapshot",
journalFile.getName()));
if (!journalFile.delete() && journalFile.exists()) {
logger.log(Level.WARNING, () -> "Unable to delete expired journal file " + journalFile + "; this file should be deleted manually.");
}
if (overflowDirectory.exists()) {
final File[] overflowFiles = overflowDirectory.listFiles();
if (overflowFiles == null) {
logger.log(Level.WARNING, () -> "Unable to obtain listing of files that exist in 'overflow directory' " + overflowDirectory
+ " - this directory and any files within it can now be safely removed manually");
return;
}
for (final File overflowFile : overflowFiles) {
if (!overflowFile.delete() && overflowFile.exists()) {
logger.log(Level.WARNING, () -> "After expiring journal file " + journalFile + ", unable to remove 'overflow file' " + overflowFile + " - this file should be removed manually");
}
}
if (!overflowDirectory.delete()) {
logger.log(Level.WARNING, () -> "After expiring journal file " + journalFile + ", unable to remove 'overflow directory' " + overflowDirectory + " - this file should be removed manually");
}
}
}
private static String getBaseFilename(final File file) {
final String name = file.getName();
final int index = name.lastIndexOf(".");
if (index < 0) {
return name;
}
return name.substring(0, index);
}
private synchronized OutputStream getOutputStream() throws FileNotFoundException {
if (fileOut == null) {
fileOut = new FileOutputStream(journalFile);
bufferedOut = new BufferedOutputStream(fileOut);
}
return bufferedOut;
}
@Override
public synchronized boolean isHealthy() {
return !closed && !isPoisoned();
}
private boolean isPoisoned() {
return poisonCause != null;
}
@Override
public synchronized void writeHeader() throws IOException {
try {
final DataOutputStream outStream = new DataOutputStream(getOutputStream());
outStream.writeUTF(LengthDelimitedJournal.class.getName());
outStream.writeInt(JOURNAL_ENCODING_VERSION);
serde = serdeFactory.createSerDe(null);
outStream.writeUTF(serde.getClass().getName());
outStream.writeInt(serde.getVersion());
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos)) {
serde.writeHeader(dos);
dos.flush();
final int serdeHeaderLength = baos.size();
outStream.writeInt(serdeHeaderLength);
baos.writeTo(outStream);
}
outStream.flush();
} catch (final Throwable t) {
poison(t);
final IOException ioe = (t instanceof IOException) ? (IOException) t : new IOException("Failed to create journal file " + journalFile, t);
logger.log(Level.SEVERE, MessageFormat.format("Failed to create new journal file {0} due to {1}",
journalFile, ioe.toString()), ioe);
throw ioe;
}
headerWritten = true;
}
private synchronized SerDeAndVersion validateHeader(final DataInputStream in) throws IOException {
final String journalClassName = in.readUTF();
logger.log(Level.FINE, () -> MessageFormat.format("Write Ahead Log Class Name for {0} is {1}",
journalFile, journalClassName));
if (!LengthDelimitedJournal.class.getName().equals(journalClassName)) {
throw new IOException("Invalid header information - " + journalFile + " does not appear to be a valid journal file.");
}
final int encodingVersion = in.readInt();
logger.log(Level.FINE, () -> MessageFormat.format("Encoding version for {0} is {1}",
journalFile, encodingVersion));
if (encodingVersion > JOURNAL_ENCODING_VERSION) {
throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion
+ " but this version of the code only understands version " + JOURNAL_ENCODING_VERSION + " and below");
}
final String serdeClassName = in.readUTF();
logger.log(Level.FINE, () -> MessageFormat.format("Serde Class Name for {0} is {1}",
journalFile, serdeClassName));
final SerDe<T> serde;
try {
serde = serdeFactory.createSerDe(serdeClassName);
} catch (final IllegalArgumentException iae) {
throw new IOException("Cannot read journal file " + journalFile + " because the serializer/deserializer used was " + serdeClassName
+ " but this repository is configured to use a different type of serializer/deserializer");
}
final int serdeVersion = in.readInt();
logger.log(Level.FINE, () -> "Serde version is " + serdeVersion);
if (serdeVersion > serde.getVersion()) {
throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion
+ " of the serializer/deserializer but this version of the code only understands version " + serde.getVersion() + " and below");
}
final int serdeHeaderLength = in.readInt();
final InputStream serdeHeaderIn = new LimitingInputStream(in, serdeHeaderLength);
final DataInputStream dis = new DataInputStream(serdeHeaderIn);
serde.readHeader(dis);
return new SerDeAndVersion(serde, serdeVersion);
}
// Visible/overrideable for testing.
protected void createOverflowDirectory(final Path path) throws IOException {
Files.createDirectories(path);
}
@Override
public void update(final Collection<T> records, final RecordLookup<T> recordLookup) throws IOException {
if (!headerWritten) {
throw new IllegalStateException("Cannot update journal file " + journalFile + " because no header has been written yet.");
}
if (records.isEmpty()) {
return;
}
checkState();
File overflowFile = null;
final ByteArrayDataOutputStream bados = streamPool.borrowObject();
try {
FileOutputStream overflowFileOut = null;
try {
DataOutputStream dataOut = bados.getDataOutputStream();
for (final T record : records) {
final Object recordId = serde.getRecordIdentifier(record);
final T previousRecordState = recordLookup.lookup(recordId);
serde.serializeEdit(previousRecordState, record, dataOut);
final int size = bados.getByteArrayOutputStream().size();
if (serde.isWriteExternalFileReferenceSupported() && size > maxInHeapSerializationBytes) {
if (!overflowDirectory.exists()) {
createOverflowDirectory(overflowDirectory.toPath());
}
// If we have exceeded our threshold for how much to serialize in memory,
// flush the in-memory representation to an 'overflow file' and then update
// the Data Output Stream that is used to write to the file also.
overflowFile = new File(overflowDirectory, UUID.randomUUID().toString());
final String overflowFileName = overflowFile.getName();
logger.log(Level.FINE, () -> MessageFormat.format("Length of update with {0} records exceeds in-memory max of {1} bytes. Overflowing to {2}",
records.size(), maxInHeapSerializationBytes, overflowFileName));
overflowFileOut = new FileOutputStream(overflowFile);
bados.getByteArrayOutputStream().writeTo(overflowFileOut);
bados.getByteArrayOutputStream().reset();
// change dataOut to point to the File's Output Stream so that all subsequent records are written to the file.
dataOut = new DataOutputStream(new BufferedOutputStream(overflowFileOut));
// We now need to write to the ByteArrayOutputStream a pointer to the overflow file
// so that what is written to the actual journal is that pointer.
serde.writeExternalFileReference(overflowFile, bados.getDataOutputStream());
}
}
dataOut.flush();
// If we overflowed to an external file, we need to be sure that we sync to disk before
// updating the Journal. Otherwise, we could get to a state where the Journal was flushed to disk without the
// external file being flushed. This would result in a missed update to the FlowFile Repository.
if (overflowFileOut != null) {
if (logger.isLoggable(Level.FINE)) {
long l = overflowFile.length();
logger.log(Level.FINE, () -> MessageFormat.format("Length of update to overflow file is {0} bytes", l));
}
overflowFileOut.getFD().sync();
}
} finally {
if (overflowFileOut != null) {
try {
overflowFileOut.close();
} catch (final Exception e) {
logger.log(Level.WARNING, "Failed to close open file handle to overflow file " + overflowFile.getName(), e);
}
}
}
final ByteArrayOutputStream baos = bados.getByteArrayOutputStream();
final OutputStream out = getOutputStream();
final long transactionId;
synchronized (this) {
checkState();
try {
transactionId = currentTransactionId++;
transactionCount++;
transactionPreamble.clear();
transactionPreamble.putLong(transactionId);
transactionPreamble.putInt(baos.size());
out.write(TRANSACTION_FOLLOWS);
out.write(transactionPreamble.array());
baos.writeTo(out);
out.flush();
} catch (final Throwable t) {
// While the outter Throwable that wraps this "catch" will call Poison, it is imperative that we call poison()
// before the synchronized block is excited. Otherwise, another thread could potentially corrupt the journal before
// the poison method closes the file.
poison(t);
throw t;
}
}
logger.log(Level.FINE, () -> MessageFormat.format("Wrote Transaction {0} to journal {1} with length {2} and {3} records",
transactionId, journalFile, baos.size(), records.size()));
} catch (final Throwable t) {
poison(t);
if (overflowFile != null) {
if (!overflowFile.delete() && overflowFile.exists()) {
final String overflowFileName = overflowFile.getName();
logger.log(Level.WARNING, () -> "Failed to cleanup temporary overflow file " + overflowFileName + " - this file should be cleaned up manually.");
}
}
throw t;
} finally {
streamPool.returnObject(bados);
}
}
private void checkState() throws IOException {
final Throwable cause = this.poisonCause;
if (cause != null) {
logger.log(Level.FINE, "Cannot update Write Ahead Log because the log has already been poisoned", cause);
throw new IOException("Cannot update journal file " + journalFile + " because this journal has already encountered a failure when attempting to write to the file. "
+ "If the repository is able to checkpoint, then this problem will resolve itself. However, if the repository is unable to be checkpointed "
+ "(for example, due to being out of storage space or having too many open files), then this issue may require manual intervention.", cause);
}
if (closed) {
throw new IOException("Cannot update journal file " + journalFile + " because this journal has already been closed");
}
}
protected void poison(final Throwable t) {
this.poisonCause = t;
logger.log(Level.SEVERE, MessageFormat.format("Marking Write-Ahead journal file {0} as poisoned due to {1}",
journalFile, t), t);
try {
if (fileOut != null) {
fileOut.close();
}
closed = true;
} catch (final IOException innerIOE) {
t.addSuppressed(innerIOE);
}
}
@Override
public synchronized void fsync() throws IOException {
checkState();
try {
if (fileOut != null) {
fileOut.getChannel().force(false);
}
} catch (final IOException ioe) {
poison(ioe);
}
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
try {
if (fileOut != null) {
if (!isPoisoned()) {
fileOut.write(JOURNAL_COMPLETE);
}
fileOut.close();
}
} catch (final IOException ioe) {
poison(ioe);
}
}
@Override
public JournalRecovery recoverRecords(final Map<Object, T> recordMap, final Set<String> swapLocations) throws IOException {
long maxTransactionId = -1L;
int updateCount = 0;
boolean eofException = false;
logger.log(Level.INFO, () -> "Recovering records from journal " + journalFile);
final double journalLength = journalFile.length();
try (final InputStream fis = new FileInputStream(journalFile);
final InputStream bufferedIn = new BufferedInputStream(fis);
final ByteCountingInputStream byteCountingIn = new ByteCountingInputStream(bufferedIn);
final DataInputStream in = new DataInputStream(byteCountingIn)) {
try {
// Validate that the header is what we expect and obtain the appropriate SerDe and Version information
final SerDeAndVersion serdeAndVersion = validateHeader(in);
final SerDe<T> serde = serdeAndVersion.getSerDe();
// Ensure that we get a valid transaction indicator
int transactionIndicator = in.read();
if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) {
throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of "
+ transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted.");
}
long consumedAtLog = 0L;
// We don't want to apply the updates in a transaction until we've finished recovering the entire
// transaction. Otherwise, we could apply say 8 out of 10 updates and then hit an EOF. In such a case,
// we want to rollback the entire transaction. We handle this by not updating recordMap or swapLocations
// variables directly but instead keeping track of the things that occurred and then once we've read the
// entire transaction, we can apply those updates to the recordMap and swapLocations.
final Map<Object, T> transactionRecordMap = new HashMap<>();
final Set<Object> idsRemoved = new HashSet<>();
final Set<String> swapLocationsRemoved = new HashSet<>();
final Set<String> swapLocationsAdded = new HashSet<>();
int transactionUpdates = 0;
// While we have a transaction to recover, recover it
while (transactionIndicator == TRANSACTION_FOLLOWS) {
transactionRecordMap.clear();
idsRemoved.clear();
swapLocationsRemoved.clear();
swapLocationsAdded.clear();
transactionUpdates = 0;
// Format is <Transaction ID: 8 bytes> <Transaction Length: 4 bytes> <Transaction data: # of bytes indicated by Transaction Length Field>
final long transactionId = in.readLong();
maxTransactionId = Math.max(maxTransactionId, transactionId);
final int transactionLength = in.readInt();
// Use SerDe to deserialize the update. We use a LimitingInputStream to ensure that the SerDe is not able to read past its intended
// length, in case there is a bug in the SerDe. We then use a ByteCountingInputStream so that we can ensure that all of the data has
// been read and throw EOFException otherwise.
final InputStream transactionLimitingIn = new LimitingInputStream(in, transactionLength);
final ByteCountingInputStream transactionByteCountingIn = new ByteCountingInputStream(transactionLimitingIn);
final DataInputStream transactionDis = new DataInputStream(transactionByteCountingIn);
while (transactionByteCountingIn.getBytesConsumed() < transactionLength || serde.isMoreInExternalFile()) {
final T record = serde.deserializeEdit(transactionDis, recordMap, serdeAndVersion.getVersion());
// Update our RecordMap so that we have the most up-to-date version of the Record.
final Object recordId = serde.getRecordIdentifier(record);
final UpdateType updateType = serde.getUpdateType(record);
switch (updateType) {
case DELETE: {
idsRemoved.add(recordId);
transactionRecordMap.remove(recordId);
break;
}
case SWAP_IN: {
final String location = serde.getLocation(record);
if (location == null) {
logger.log(Level.SEVERE, "Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record");
} else {
swapLocationsRemoved.add(location);
swapLocationsAdded.remove(location);
transactionRecordMap.put(recordId, record);
}
break;
}
case SWAP_OUT: {
final String location = serde.getLocation(record);
if (location == null) {
logger.log(Level.SEVERE, "Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record");
} else {
swapLocationsRemoved.remove(location);
swapLocationsAdded.add(location);
idsRemoved.add(recordId);
transactionRecordMap.remove(recordId);
}
break;
}
default: {
transactionRecordMap.put(recordId, record);
idsRemoved.remove(recordId);
break;
}
}
transactionUpdates++;
}
// Apply the transaction
for (final Object id : idsRemoved) {
recordMap.remove(id);
}
recordMap.putAll(transactionRecordMap);
swapLocations.removeAll(swapLocationsRemoved);
swapLocations.addAll(swapLocationsAdded);
updateCount += transactionUpdates;
// Check if there is another transaction to read
transactionIndicator = in.read();
if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) {
throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of "
+ transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted.");
}
// If we have a very large journal (for instance, if checkpoint is not called for a long time, or if there is a problem rolling over
// the journal), then we want to occasionally notify the user that we are, in fact, making progress, so that it doesn't appear that
// NiFi has become "stuck".
final long consumed = byteCountingIn.getBytesConsumed();
if (consumed - consumedAtLog > 50_000_000) {
final double percentage = consumed / journalLength * 100D;
final String pct = new DecimalFormat("#.00").format(percentage);
final int count = updateCount;
logger.log(Level.INFO, () -> MessageFormat.format("{0}% of the way finished recovering journal {1}, having recovered {2} updates",
pct, journalFile, count));
consumedAtLog = consumed;
}
}
} catch (final EOFException eof) {
eofException = true;
logger.log(Level.WARNING, () -> MessageFormat.format("Encountered unexpected End-of-File when reading journal file {0}; assuming that NiFi was shutdown unexpectedly and continuing recovery",
journalFile));
} catch (final Exception e) {
// If the stream consists solely of NUL bytes, then we want to treat it
// the same as an EOF because we see this happen when we suddenly lose power
// while writing to a file. However, if that is not the case, then something else has gone wrong.
// In such a case, there is not much that we can do but to re-throw the Exception.
if (remainingBytesAllNul(in)) {
logger.log(Level.WARNING, "Failed to recover some of the data from Write-Ahead Log Journal because encountered trailing NUL bytes. "
+ "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes."
+ "The following Exception was encountered while recovering the updates to the journal:", e);
} else {
throw e;
}
}
}
final int count = updateCount;
logger.log(Level.INFO, () -> MessageFormat.format("Successfully recovered {0} updates from journal {1}", count, journalFile));
return new StandardJournalRecovery(updateCount, maxTransactionId, eofException);
}
/**
* In the case of a sudden power loss, it is common - at least in a Linux journaling File System -
* that the partition file that is being written to will have many trailing "NUL bytes" (0's).
* If this happens, then on restart we want to treat this as an incomplete transaction, so we detect
* this case explicitly.
*
* @param in the input stream to scan
* @return <code>true</code> if the InputStream contains no data or contains only NUL bytes
* @throws IOException if unable to read from the given InputStream
*/
private boolean remainingBytesAllNul(final InputStream in) throws IOException {
int nextByte;
while ((nextByte = in.read()) != -1) {
if (nextByte != NUL_BYTE) {
return false;
}
}
return true;
}
@Override
public synchronized JournalSummary getSummary() {
if (transactionCount < 1) {
return INACTIVE_JOURNAL_SUMMARY;
}
return new StandardJournalSummary(initialTransactionId, currentTransactionId - 1, transactionCount);
}
private class SerDeAndVersion {
private final SerDe<T> serde;
private final int version;
public SerDeAndVersion(final SerDe<T> serde, final int version) {
this.serde = serde;
this.version = version;
}
public SerDe<T> getSerDe() {
return serde;
}
public int getVersion() {
return version;
}
}
}

View file

@ -0,0 +1,122 @@
package org.xbib.event.wal;
import java.io.IOException;
import java.io.InputStream;
public class LimitingInputStream extends InputStream {
private final InputStream in;
private final long limit;
private long bytesRead = 0;
private volatile boolean limitReached = false;
private long markOffset = -1L;
/**
* Constructs a limited input stream whereby if the limit is reached all
* subsequent calls to read will return a -1 and hasLimitReached() will
* indicate true. The limit is inclusive so if all 100 bytes of a 100 byte
* stream are read it will be true, otherwise false.
*
* @param in the underlying input stream
* @param limit maximum length of bytes to read from underlying input stream
*/
public LimitingInputStream(final InputStream in, final long limit) {
this.in = in;
this.limit = limit;
}
public boolean hasReachedLimit() throws IOException {
return limitReached;
}
private int markLimitReached() {
limitReached = true;
return -1;
}
@Override
public int read() throws IOException {
if (bytesRead >= limit) {
return markLimitReached();
}
final int val = in.read();
if (val > -1) {
bytesRead++;
}
return val;
}
@Override
public int read(final byte[] b) throws IOException {
if (bytesRead >= limit) {
return markLimitReached();
}
final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
final int val = in.read(b, 0, maxToRead);
if (val > 0) {
bytesRead += val;
}
return val;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (bytesRead >= limit) {
return markLimitReached();
}
final int maxToRead = (int) Math.min(len, limit - bytesRead);
final int val = in.read(b, off, maxToRead);
if (val > 0) {
bytesRead += val;
}
return val;
}
@Override
public long skip(final long n) throws IOException {
final long toSkip = Math.min(n, limit - bytesRead);
final long skipped = in.skip(toSkip);
bytesRead += skipped;
return skipped;
}
@Override
public int available() throws IOException {
return in.available();
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public void mark(int readlimit) {
in.mark(readlimit);
markOffset = bytesRead;
}
@Override
public boolean markSupported() {
return in.markSupported();
}
@Override
public void reset() throws IOException {
in.reset();
if (markOffset >= 0) {
bytesRead = markOffset;
}
markOffset = -1;
}
public long getLimit() {
return limit;
}
}

View file

@ -0,0 +1,8 @@
package org.xbib.event.wal;
public interface ObjectPool<T> {
T borrowObject();
void returnObject(T somethingBorrowed);
}

View file

@ -0,0 +1,12 @@
package org.xbib.event.wal;
public interface RecordLookup<T> {
/**
* Returns the Record with the given identifier, or <code>null</code> if no such record exists
*
* @param identifier the identifier of the record to lookup
* @return the Record with the given identifier, or <code>null</code> if no such record exists
*/
T lookup(Object identifier);
}

View file

@ -0,0 +1,327 @@
package org.xbib.event.wal;
import java.io.File;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
/**
* <p>
* This implementation of WriteAheadRepository provides the ability to write all updates to the
* repository sequentially by writing to a single journal file. Serialization of data into bytes
* happens outside of any lock contention and is done so using recycled byte buffers. As such,
* we occur minimal garbage collection and the theoretical throughput of this repository is equal
* to the throughput of the underlying disk itself.
* </p>
*
* <p>
* This implementation makes the assumption that only a single thread will ever issue updates for
* a given Record at any one time. I.e., the implementation is thread-safe but cannot guarantee
* that records are recovered correctly if two threads simultaneously update the write-ahead log
* with updates for the same record.
* </p>
*/
public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T> {
private static final int PARTITION_INDEX = 0;
private static final Logger logger = Logger.getLogger(SequentialAccessWriteAheadLog.class.getName());
private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
private static final int MAX_BUFFERS = 64;
private static final int BUFFER_SIZE = 256 * 1024;
private final File storageDirectory;
private final File journalsDirectory;
protected final SerDeFactory<T> serdeFactory;
private final SyncListener syncListener;
private final Set<String> recoveredSwapLocations = new HashSet<>();
private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock();
private final Lock journalReadLock = journalRWLock.readLock();
private final Lock journalWriteLock = journalRWLock.writeLock();
private final ObjectPool<ByteArrayDataOutputStream> streamPool = new BlockingQueuePool<>(MAX_BUFFERS,
() -> new ByteArrayDataOutputStream(BUFFER_SIZE),
stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE,
stream -> stream.getByteArrayOutputStream().reset());
private final WriteAheadSnapshot<T> snapshot;
private final RecordLookup<T> recordLookup;
private SnapshotRecovery<T> snapshotRecovery;
private volatile boolean recovered = false;
private WriteAheadJournal<T> journal;
private volatile long nextTransactionId = 0L;
public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory) throws IOException {
this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER);
}
public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
throw new IOException("Directory " + storageDirectory + " does not exist and cannot be created");
}
if (!storageDirectory.isDirectory()) {
throw new IOException("File " + storageDirectory + " is a regular file and not a directory");
}
final HashMapSnapshot<T> hashMapSnapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory);
this.snapshot = hashMapSnapshot;
this.recordLookup = hashMapSnapshot;
this.storageDirectory = storageDirectory;
this.journalsDirectory = new File(storageDirectory, "journals");
if (!journalsDirectory.exists() && !journalsDirectory.mkdirs()) {
throw new IOException("Directory " + journalsDirectory + " does not exist and cannot be created");
}
recovered = false;
this.serdeFactory = serdeFactory;
this.syncListener = (syncListener == null) ? SyncListener.NOP_SYNC_LISTENER : syncListener;
}
@Override
public int update(final Collection<T> records, final boolean forceSync) throws IOException {
if (!recovered) {
throw new IllegalStateException("Cannot update repository until record recovery has been performed");
}
journalReadLock.lock();
try {
journal.update(records, recordLookup);
if (forceSync) {
journal.fsync();
syncListener.onSync(PARTITION_INDEX);
}
snapshot.update(records);
} finally {
journalReadLock.unlock();
}
return PARTITION_INDEX;
}
@Override
public synchronized Collection<T> recoverRecords() throws IOException {
if (recovered) {
throw new IllegalStateException("Cannot recover records from repository because record recovery has already commenced");
}
logger.log(Level.INFO, () -> MessageFormat.format("Recovering records from Write-Ahead Log at {0}",
storageDirectory));
final long recoverStart = System.nanoTime();
recovered = true;
snapshotRecovery = snapshot.recover();
this.recoveredSwapLocations.addAll(snapshotRecovery.getRecoveredSwapLocations());
final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart);
final Map<Object, T> recoveredRecords = snapshotRecovery.getRecords();
final Set<String> swapLocations = snapshotRecovery.getRecoveredSwapLocations();
final File[] journalFiles = journalsDirectory.listFiles(this::isJournalFile);
if (journalFiles == null) {
throw new IOException("Cannot access the list of files in directory " + journalsDirectory +
"; please ensure that appropriate file permissions are set.");
}
if (snapshotRecovery.getRecoveryFile() == null) {
logger.log(Level.INFO, () -> MessageFormat.format("No Snapshot File to recover from at {0}. Now recovering records from {1} journal files",
storageDirectory, journalFiles.length));
} else {
logger.log(Level.INFO, () -> MessageFormat.format("Successfully recovered {0} records and {1} swap files from Snapshot at {2} with Max Transaction ID of {3} in {4} milliseconds. Now recovering records from {5} journal files",
recoveredRecords.size(), swapLocations.size(), snapshotRecovery.getRecoveryFile(), snapshotRecovery.getMaxTransactionId(),
snapshotRecoveryMillis, journalFiles.length));
}
final List<File> orderedJournalFiles = Arrays.asList(journalFiles);
orderedJournalFiles.sort((o1, o2) -> {
final long transactionId1 = getMinTransactionId(o1);
final long transactionId2 = getMinTransactionId(o2);
return Long.compare(transactionId1, transactionId2);
});
final long snapshotTransactionId = snapshotRecovery.getMaxTransactionId();
int totalUpdates = 0;
int journalFilesRecovered = 0;
int journalFilesSkipped = 0;
long maxTransactionId = snapshotTransactionId;
for (final File journalFile : orderedJournalFiles) {
final long journalMinTransactionId = getMinTransactionId(journalFile);
if (journalMinTransactionId < snapshotTransactionId) {
logger.log(Level.FINE, () -> MessageFormat.format("Will not recover records from journal file {0} because the minimum Transaction ID for that journal is {1} and the Transaction ID recovered from Snapshot was {2}",
journalFile, journalMinTransactionId, snapshotTransactionId));
journalFilesSkipped++;
continue;
}
logger.log(Level.FINE, () -> MessageFormat.format("Min Transaction ID for journal {0} is {1}, so will recover records from journal",
journalFile, journalMinTransactionId));
journalFilesRecovered++;
try (final WriteAheadJournal<T> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final JournalRecovery journalRecovery = journal.recoverRecords(recoveredRecords, swapLocations);
final int updates = journalRecovery.getUpdateCount();
logger.log(Level.FINE, () -> MessageFormat.format("Recovered {0} updates from journal {1}",
updates, journalFile));
totalUpdates += updates;
maxTransactionId = Math.max(maxTransactionId, journalRecovery.getMaxTransactionId());
}
}
logger.log(Level.FINE, MessageFormat.format("Recovered {0} updates from {1} journal files and skipped {2} journal files because their data was already encapsulated in the snapshot",
totalUpdates, journalFilesRecovered, journalFilesSkipped));
this.nextTransactionId = maxTransactionId + 1;
final long recoverNanos = System.nanoTime() - recoverStart;
final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
logger.log(Level.INFO, () -> MessageFormat.format("Successfully recovered {0} records in {1} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state",
recoveredRecords.size(), recoveryMillis));
this.recoveredSwapLocations.addAll(swapLocations);
checkpoint(this.recoveredSwapLocations);
return recoveredRecords.values();
}
private long getMinTransactionId(final File journalFile) {
final String filename = journalFile.getName();
final String numeral = filename.substring(0, filename.indexOf("."));
return Long.parseLong(numeral);
}
private boolean isJournalFile(final File file) {
if (!file.isFile()) {
return false;
}
final String filename = file.getName();
return JOURNAL_FILENAME_PATTERN.matcher(filename).matches();
}
@Override
public synchronized Set<String> getRecoveredSwapLocations() throws IOException {
if (!recovered) {
throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed");
}
return Collections.unmodifiableSet(this.recoveredSwapLocations);
}
public SnapshotCapture<T> captureSnapshot() {
return snapshot.prepareSnapshot(nextTransactionId - 1);
}
@Override
public int checkpoint() throws IOException {
return checkpoint(null);
}
private int checkpoint(final Set<String> swapLocations) throws IOException {
final SnapshotCapture<T> snapshotCapture;
final long startNanos = System.nanoTime();
final File[] existingJournals;
journalWriteLock.lock();
try {
if (journal != null) {
final JournalSummary journalSummary = journal.getSummary();
if (journalSummary.getTransactionCount() == 0 && journal.isHealthy()) {
logger.log(Level.FINE, "Will not checkpoint Write-Ahead Log because no updates have occurred since last checkpoint");
return snapshot.getRecordCount();
}
try {
journal.fsync();
} catch (final Exception e) {
logger.log(Level.SEVERE, "Failed to synch Write-Ahead Log's journal to disk at " + storageDirectory, e);
}
try {
journal.close();
} catch (final Exception e) {
logger.log(Level.SEVERE, "Failed to close Journal while attempting to checkpoint Write-Ahead Log at " + storageDirectory, e);
}
nextTransactionId = Math.max(nextTransactionId, journalSummary.getLastTransactionId() + 1);
}
syncListener.onGlobalSync();
final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile);
existingJournals = (existingFiles == null) ? new File[0] : existingFiles;
if (swapLocations == null) {
snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
} else {
snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1, swapLocations);
}
// Create a new journal. We name the journal file <next transaction id>.journal but it is possible
// that we could have an empty journal file already created. If this happens, we don't want to create
// a new file on top of it because it would get deleted below when we clean up old journals. So we
// will simply increment our transaction ID and try again.
File journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
while (journalFile.exists()) {
nextTransactionId++;
journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
}
journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, nextTransactionId);
journal.writeHeader();
logger.log(Level.FINE, () -> MessageFormat.format("Created new Journal starting with Transaction ID {0}", nextTransactionId));
} finally {
journalWriteLock.unlock();
}
final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
snapshot.writeSnapshot(snapshotCapture);
for (final File existingJournal : existingJournals) {
final WriteAheadJournal<?> journal = new LengthDelimitedJournal<>(existingJournal, serdeFactory, streamPool, nextTransactionId);
journal.dispose();
}
final long totalNanos = System.nanoTime() - startNanos;
final long millis = TimeUnit.NANOSECONDS.toMillis(totalNanos);
logger.log(Level.INFO, () -> MessageFormat.format("Checkpointed Write-Ahead Log with {0} Records and {1} Swap Files in {2} milliseconds (Stop-the-world time = {3} milliseconds), max Transaction ID {4}",
snapshotCapture.getRecords().size(), snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, snapshotCapture.getMaxTransactionId()));
return snapshotCapture.getRecords().size();
}
@Override
public void shutdown() throws IOException {
journalWriteLock.lock();
try {
if (journal != null) {
journal.close();
}
} finally {
journalWriteLock.unlock();
}
}
}

View file

@ -0,0 +1,172 @@
package org.xbib.event.wal;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
* A mechanism for Serializing and De-Serializing a Record of a given Type
*
* @param <T> the type of record that is to be Serialized and De-Serialized by
* this object
*/
public interface SerDe<T> {
/**
* Provides the SerDe a chance to write header information to the given output stream
*
* @param out the DataOutputStream to write to
* @throws IOException if unable to write to the OutputStream
*/
default void writeHeader(DataOutputStream out) throws IOException {
}
/**
* <p>
* Serializes an Edit Record to the log via the given
* {@link DataOutputStream}.
* </p>
*
* @param previousRecordState previous state
* @param newRecordState new state
* @param out stream to write to
* @throws IOException if fail during write
*/
void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException;
/**
* <p>
* Serializes a Record in a form suitable for a Snapshot via the given
* {@link DataOutputStream}.
* </p>
*
* @param record to serialize
* @param out to write to
* @throws IOException if failed to write
*/
void serializeRecord(T record, DataOutputStream out) throws IOException;
/**
* Provides the SerDe the opportunity to read header information before deserializing any records
*
* @param in the InputStream to read from
* @throws IOException if unable to read from the InputStream
*/
default void readHeader(DataInputStream in) throws IOException {
}
/**
* <p>
* Reads an Edit Record from the given {@link DataInputStream} and merges
* that edit with the current version of the record, returning the new,
* merged version. If the Edit Record indicates that the entity was deleted,
* must return a Record with an UpdateType of {@link UpdateType#DELETE}.
* This method must never return <code>null</code>.
* </p>
*
* @param in to deserialize from
* @param currentRecordStates an unmodifiable map of Record ID's to the
* current state of that record
* @param version the version of the SerDe that was used to serialize the
* edit record
* @return deserialized record
* @throws IOException if failure reading
*/
T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, int version) throws IOException;
/**
* <p>
* Reads a Record from the given {@link DataInputStream} and returns this
* record. If no data is available, returns <code>null</code>.
* </p>
*
* @param in stream to read from
* @param version the version of the SerDe that was used to serialize the
* record
* @return record
* @throws IOException failure reading
*/
T deserializeRecord(DataInputStream in, int version) throws IOException;
/**
* Returns the unique ID for the given record
*
* @param record to obtain identifier for
* @return identifier of record
*/
Object getRecordIdentifier(T record);
/**
* Returns the UpdateType for the given record
*
* @param record to retrieve update type for
* @return update type
*/
UpdateType getUpdateType(T record);
/**
* Returns the external location of the given record; this is used when a
* record is moved away from WALI or is being re-introduced to WALI. For
* example, WALI can be updated with a record of type
* {@link UpdateType#SWAP_OUT} that indicates a Location of
* file://tmp/external1 and can then be re-introduced to WALI by updating
* WALI with a record of type {@link UpdateType#CREATE} that indicates a
* Location of file://tmp/external1
*
* @param record to get location of
* @return location
*/
String getLocation(T record);
/**
* Returns the version that this SerDe will use when writing. This used used
* when serializing/deserializing the edit logs so that if the version
* changes, we are still able to deserialize old versions
*
* @return version
*/
int getVersion();
/**
* Closes any resources that the SerDe is holding open
*
* @throws IOException if unable to close resources
*/
default void close() throws IOException {
}
/**
* Optional method that a SerDe can support that indicates that the contents of the next update should be found
* in the given external File.
*
* @param externalFile the file that contains the update information
* @param out the DataOutputStream to write the external file reference to
* @throws IOException if unable to write the update
* @throws UnsupportedOperationException if this SerDe does not support this operation
*/
default void writeExternalFileReference(File externalFile, DataOutputStream out) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Indicates whether or not a call to {@link #writeExternalFileReference(File, DataOutputStream)} is valid for this implementation
* @return <code>true</code> if calls to {@link #writeExternalFileReference(File, DataOutputStream)} are supported, <code>false</code> if calling
* the method will result in an {@link UnsupportedOperationException} being thrown.
*/
default boolean isWriteExternalFileReferenceSupported() {
return false;
}
/**
* If the last call to read data from this SerDe resulted in data being read from an External File, and there is more data in that External File,
* then this method will return <code>true</code>. Otherwise, it will return <code>false</code>.
*
* @return <code>true</code> if more data available in External File, <code>false</code> otherwise.
* @throws IOException if unable to read from External File to determine data availability
*/
default boolean isMoreInExternalFile() throws IOException {
return false;
}
}

View file

@ -0,0 +1,43 @@
package org.xbib.event.wal;
public interface SerDeFactory<T> {
/**
* Returns a new SerDe
*
* @param encodingName the name of encoding that was used when writing the serialized data, or <code>null</code> if
* the SerDe is to be used for serialization purposes
* @return a SerDe
*/
SerDe<T> createSerDe(String encodingName);
/**
* Returns the unique ID for the given record
*
* @param record to obtain identifier for
* @return identifier of record
*/
Object getRecordIdentifier(T record);
/**
* Returns the UpdateType for the given record
*
* @param record to retrieve update type for
* @return update type
*/
UpdateType getUpdateType(T record);
/**
* Returns the external location of the given record; this is used when a
* record is moved away from WALI or is being re-introduced to WALI. For
* example, WALI can be updated with a record of type
* {@link UpdateType#SWAP_OUT} that indicates a Location of
* file://tmp/external1 and can then be re-introduced to WALI by updating
* WALI with a record of type {@link UpdateType#CREATE} that indicates a
* Location of file://tmp/external1
*
* @param record to get location of
* @return location
*/
String getLocation(T record);
}

View file

@ -0,0 +1,12 @@
package org.xbib.event.wal;
import java.util.Map;
import java.util.Set;
public interface SnapshotCapture<T> {
Map<Object, T> getRecords();
long getMaxTransactionId();
Set<String> getSwapLocations();
}

View file

@ -0,0 +1,41 @@
package org.xbib.event.wal;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
public interface SnapshotRecovery<T> {
long getMaxTransactionId();
Map<Object, T> getRecords();
Set<String> getRecoveredSwapLocations();
File getRecoveryFile();
public static <T> SnapshotRecovery<T> emptyRecovery() {
return new SnapshotRecovery<T>() {
@Override
public long getMaxTransactionId() {
return -1L;
}
@Override
public Map<Object, T> getRecords() {
return Collections.emptyMap();
}
@Override
public Set<String> getRecoveredSwapLocations() {
return Collections.emptySet();
}
@Override
public File getRecoveryFile() {
return null;
}
};
}
}

View file

@ -0,0 +1,28 @@
package org.xbib.event.wal;
public class StandardJournalRecovery implements JournalRecovery {
private final int updateCount;
private final long maxTransactionId;
private final boolean eofException;
public StandardJournalRecovery(final int updateCount, final long maxTransactionId, final boolean eofException) {
this.updateCount = updateCount;
this.maxTransactionId = maxTransactionId;
this.eofException = eofException;
}
@Override
public int getUpdateCount() {
return updateCount;
}
@Override
public long getMaxTransactionId() {
return maxTransactionId;
}
@Override
public boolean isEOFExceptionEncountered() {
return eofException;
}
}

View file

@ -0,0 +1,29 @@
package org.xbib.event.wal;
public class StandardJournalSummary implements JournalSummary {
private final long firstTransactionId;
private final long lastTransactionId;
private final int transactionCount;
public StandardJournalSummary(final long firstTransactionId, final long lastTransactionId, final int transactionCount) {
this.firstTransactionId = firstTransactionId;
this.lastTransactionId = lastTransactionId;
this.transactionCount = transactionCount;
}
@Override
public long getFirstTransactionId() {
return firstTransactionId;
}
@Override
public long getLastTransactionId() {
return lastTransactionId;
}
@Override
public int getTransactionCount() {
return transactionCount;
}
}

View file

@ -0,0 +1,39 @@
package org.xbib.event.wal;
import java.io.File;
import java.util.Map;
import java.util.Set;
public class StandardSnapshotRecovery<T> implements SnapshotRecovery<T> {
private final Map<Object, T> recordMap;
private final Set<String> recoveredSwapLocations;
private final File recoveryFile;
private final long maxTransactionId;
public StandardSnapshotRecovery(final Map<Object, T> recordMap, final Set<String> recoveredSwapLocations, final File recoveryFile, final long maxTransactionId) {
this.recordMap = recordMap;
this.recoveredSwapLocations = recoveredSwapLocations;
this.recoveryFile = recoveryFile;
this.maxTransactionId = maxTransactionId;
}
@Override
public long getMaxTransactionId() {
return maxTransactionId;
}
@Override
public Map<Object, T> getRecords() {
return recordMap;
}
@Override
public Set<String> getRecoveredSwapLocations() {
return recoveredSwapLocations;
}
@Override
public File getRecoveryFile() {
return recoveryFile;
}
}

View file

@ -0,0 +1,56 @@
package org.xbib.event.wal;
/**
* <p>
* Provides a callback mechanism by which applicable listeners can be notified
* when a WriteAheadRepository is synched (via the
* {@link WriteAheadRepository#sync()} method) or one of its partitions is
* synched via
* {@link WriteAheadRepository#update(java.util.Collection, boolean)} with a
* value of <code>true</code> for the second argument.
* </p>
*
* <p>
* It is not required that an implementation of {@link WriteAheadRepository}
* support this interface. Those that do generally will require that the
* listener be injected via the constructor.
* </p>
*
* <p>
* All implementations of this interface must be thread-safe.
* </p>
*
* <p>
* The {@link #onSync(int)} method will always be called while the associated
* partition is locked. The {@link #onGlobalSync()} will always be called while
* the entire repository is locked.
* </p>
*
*/
public interface SyncListener {
/**
* This method is called whenever a specific partition is synched via the
* {@link WriteAheadRepository#update(java.util.Collection, boolean)} method
*
* @param partitionIndex the index of the partition that was synched
*/
void onSync(int partitionIndex);
/**
* This method is called whenever the entire
* <code>WriteAheadRepository</code> is synched via the
* {@link WriteAheadRepository#sync()} method.
*/
void onGlobalSync();
public static final SyncListener NOP_SYNC_LISTENER = new SyncListener() {
@Override
public void onSync(int partitionIndex) {
}
@Override
public void onGlobalSync() {
}
};
}

View file

@ -0,0 +1,33 @@
package org.xbib.event.wal;
/**
* <p>
* Enumerates the valid types of things that can cause a
* {@link WriteAheadRepository} to update its state</p>
*/
public enum UpdateType {
/**
* Used when a new Record has been created
*/
CREATE,
/**
* Used when a Record has been updated in some way
*/
UPDATE,
/**
* Used to indicate that a Record has been deleted and should be removed
* from the Repository
*/
DELETE,
/**
* Used to indicate that a Record still exists but has been moved elsewhere,
* so that it is no longer maintained by the WALI instance
*/
SWAP_OUT,
/**
* Used to indicate that a Record that was previously Swapped Out is now
* being Swapped In
*/
SWAP_IN;
}

View file

@ -0,0 +1,44 @@
package org.xbib.event.wal;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
public interface WriteAheadJournal<T> extends Closeable {
JournalRecovery recoverRecords(Map<Object, T> recordMap, Set<String> swapLocations) throws IOException;
/**
* Updates the journal with the given set of records
*
* @param records the records to update
* @param recordLookup a lookup that can be used to access the current value of a record, given its ID
*
* @throws IOException if unable to write to the underlying storage mechanism
*/
void update(Collection<T> records, RecordLookup<T> recordLookup) throws IOException;
void writeHeader() throws IOException;
void fsync() throws IOException;
/**
* Returns information about what was written to the journal
*
* @return A JournalSummary indicating what was written to the journal
* @throws IOException if unable to write to the underlying storage mechanism.
*/
JournalSummary getSummary();
/**
* @return <code>true</code> if the journal is healthy and can be written to, <code>false</code> if either the journal has been closed or is poisoned
*/
boolean isHealthy();
/**
* Destroys any resources that the journal occupies
*/
void dispose();
}

View file

@ -0,0 +1,106 @@
package org.xbib.event.wal;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
/**
* <p>
* A WriteAheadRepository is used to persist state that is otherwise kept
* in-memory. The Repository does not provide any query capability except to
* allow the data to be recovered upon restart of the system.
* </p>
*
* <p>
* A WriteAheadRepository operates by writing every update to an Edit Log. On
* restart, the data can be recovered by replaying all of the updates that are
* found in the Edit Log. This can, however, eventually result in very large
* Edit Logs, which can both take up massive amounts of disk space and take a
* long time to recover. In order to prevent this, the Repository provides a
* Checkpointing capability. This allows the current in-memory state of the
* Repository to be flushed to disk and the Edit Log to be deleted, thereby
* compacting the amount of space required to store the Repository. After a
* Checkpoint is performed, modifications are again written to an Edit Log. At
* this point, when the system is to be restored, it is restored by first
* loading the Checkpointed version of the Repository and then replaying the
* Edit Log.
* </p>
*
* <p>
* All implementations of <code>WriteAheadRepository</code> use one or more
* partitions to manage their Edit Logs. An implementation may require exactly
* one partition or may allow many partitions.
* </p>
*
* @param <T> the type of Record this repository is for
*/
public interface WriteAheadRepository<T> {
/**
* <p>
* Updates the repository with the specified Records. The Collection must
* not contain multiple records with the same ID
* </p>
*
* @param records the records to update
* @param forceSync specifies whether or not the Repository forces the data
* to be flushed to disk. If false, the data may be stored in Operating
* System buffers, which improves performance but could cause loss of data
* if power is lost or the Operating System crashes
* @throws IOException if failure to update repo
* @throws IllegalArgumentException if multiple records within the given
* Collection have the same ID, as specified by {@link SerDe#getRecordIdentifier(Object)}
* method
*
* @return the index of the Partition that performed the update
*/
int update(Collection<T> records, boolean forceSync) throws IOException;
/**
* <p>
* Recovers all records from the persisted state. This method must be called
* before any updates are issued to the Repository.
* </p>
*
* @return recovered records
* @throws IOException if failure to read from repo
* @throws IllegalStateException if any updates have been issued against
* this Repository before this method is invoked
*/
Collection<T> recoverRecords() throws IOException;
/**
* <p>
* Recovers all External Swap locations that were persisted. If this method
* is to be called, it must be called AFTER {@link #recoverRecords()} and
* BEFORE {@link #update(Collection, boolean)}}.
* </p>
*
* @return swap location
* @throws IOException if failure reading swap locations
*/
Set<String> getRecoveredSwapLocations() throws IOException;
/**
* <p>
* Compacts the contents of the Repository so that rather than having a
* Snapshot and an Edit Log indicating many Updates to the Snapshot, the
* Snapshot is updated to contain the current state of the Repository, and
* the edit log is purged.
* </p>
*
*
* @return the number of records that were written to the new snapshot
* @throws java.io.IOException if failure during checkpoint
*/
int checkpoint() throws IOException;
/**
* <p>
* Causes the repository to checkpoint and then close any open resources.
* </p>
*
* @throws IOException if failure to shutdown cleanly
*/
void shutdown() throws IOException;
}

View file

@ -0,0 +1,19 @@
package org.xbib.event.wal;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
public interface WriteAheadSnapshot<T> {
SnapshotCapture<T> prepareSnapshot(long maxTransactionId);
SnapshotCapture<T> prepareSnapshot(long maxTransactionId, Set<String> swapLocations);
void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException;
SnapshotRecovery<T> recover() throws IOException;
void update(Collection<T> records);
int getRecordCount();
}