From 44b7ae7de2f3f1c3bb42e5cac8c9b8009baeb86d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 21 Sep 2021 18:08:31 +0200 Subject: [PATCH] add queue tape --- datastructures-queue-tape/build.gradle | 14 + .../src/main/java/module-info.java | 3 + .../queue/tape/FileObjectQueue.java | 122 +++ .../queue/tape/InMemoryObjectQueue.java | 127 +++ .../queue/tape/ObjectQueue.java | 63 ++ .../datastructures/queue/tape/QueueFile.java | 544 ++++++++++ .../com/squareup/tape2/ObjectQueueTest.java | 231 ++++ .../squareup/tape2/QueueFileLoadingTest.java | 103 ++ .../com/squareup/tape2/QueueFileTest.java | 994 ++++++++++++++++++ .../com/squareup/tape2/QueueTestUtils.java | 46 + .../src/test/resources/empty-serialized-queue | Bin 0 -> 4096 bytes .../test/resources/one-entry-serialized-queue | Bin 0 -> 4096 bytes .../truncated-empty-serialized-queue | Bin 0 -> 1024 bytes .../truncated-one-entry-serialized-queue | Bin 0 -> 1029 bytes settings.gradle | 1 + 15 files changed, 2248 insertions(+) create mode 100644 datastructures-queue-tape/build.gradle create mode 100644 datastructures-queue-tape/src/main/java/module-info.java create mode 100644 datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/FileObjectQueue.java create mode 100644 datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/InMemoryObjectQueue.java create mode 100644 datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/ObjectQueue.java create mode 100644 datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/QueueFile.java create mode 100644 datastructures-queue-tape/src/test/java/com/squareup/tape2/ObjectQueueTest.java create mode 100644 datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileLoadingTest.java create mode 100644 datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileTest.java create mode 100644 datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueTestUtils.java create mode 100644 datastructures-queue-tape/src/test/resources/empty-serialized-queue create mode 100644 datastructures-queue-tape/src/test/resources/one-entry-serialized-queue create mode 100644 datastructures-queue-tape/src/test/resources/truncated-empty-serialized-queue create mode 100644 datastructures-queue-tape/src/test/resources/truncated-one-entry-serialized-queue diff --git a/datastructures-queue-tape/build.gradle b/datastructures-queue-tape/build.gradle new file mode 100644 index 0000000..e60fa4c --- /dev/null +++ b/datastructures-queue-tape/build.gradle @@ -0,0 +1,14 @@ +dependencies { + testCompileOnly 'com.google.code.findbugs:jsr305:3.0.2' + testImplementation 'junit:junit:4.13.2' + testImplementation 'com.google.truth:truth:0.32' + testImplementation 'com.squareup.burst:burst-junit4:1.1.1' + testImplementation 'com.squareup.okio:okio:1.13.0' + testRuntimeOnly "org.junit.vintage:junit-vintage-engine:5.7.2" +} + +test { + useJUnitPlatform { + includeEngines 'junit-jupiter', 'junit-vintage' + } +} \ No newline at end of file diff --git a/datastructures-queue-tape/src/main/java/module-info.java b/datastructures-queue-tape/src/main/java/module-info.java new file mode 100644 index 0000000..e6e4da6 --- /dev/null +++ b/datastructures-queue-tape/src/main/java/module-info.java @@ -0,0 +1,3 @@ +module org.xbib.datastructures.queue.tape { + exports org.xbib.datastructures.queue.tape; +} diff --git a/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/FileObjectQueue.java b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/FileObjectQueue.java new file mode 100644 index 0000000..893c711 --- /dev/null +++ b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/FileObjectQueue.java @@ -0,0 +1,122 @@ +package org.xbib.datastructures.queue.tape; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Iterator; + +public final class FileObjectQueue extends ObjectQueue { + + private final QueueFile queueFile; + + private final DirectByteArrayOutputStream bytes = new DirectByteArrayOutputStream(); + + final Converter converter; + + public FileObjectQueue(QueueFile queueFile, Converter converter) { + this.queueFile = queueFile; + this.converter = converter; + } + + @Override + public QueueFile file() { + return queueFile; + } + + @Override + public int size() { + return queueFile.size(); + } + + @Override + public boolean isEmpty() { + return queueFile.isEmpty(); + } + + @Override + public void add(T entry) throws IOException { + bytes.reset(); + converter.toStream(entry, bytes); + queueFile.add(bytes.getArray(), 0, bytes.size()); + } + + @Override + public T peek() throws IOException { + byte[] bytes = queueFile.peek(); + if (bytes == null) return null; + return converter.from(bytes); + } + + @Override + public void remove() throws IOException { + queueFile.remove(); + } + + @Override + public void remove(int n) throws IOException { + queueFile.remove(n); + } + + @Override + public void clear() throws IOException { + queueFile.clear(); + } + + @Override + public void close() throws IOException { + queueFile.close(); + } + + @Override + public Iterator iterator() { + return new QueueFileIterator(queueFile.iterator()); + } + + @Override + public String toString() { + return "FileObjectQueue{" + "queueFile=" + queueFile + '}'; + } + + private final class QueueFileIterator implements Iterator { + final Iterator iterator; + + QueueFileIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + byte[] data = iterator.next(); + try { + return converter.from(data); + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } + } + + @Override + public void remove() { + iterator.remove(); + } + } + + /** + * Enables direct access to the internal array. Avoids unnecessary copying. + */ + private static final class DirectByteArrayOutputStream extends ByteArrayOutputStream { + DirectByteArrayOutputStream() { + } + + /** + * Gets a reference to the internal byte array. The {@link #size()} method indicates how many + * bytes contain actual data added since the last {@link #reset()} call. + */ + byte[] getArray() { + return buf; + } + } +} diff --git a/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/InMemoryObjectQueue.java b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/InMemoryObjectQueue.java new file mode 100644 index 0000000..7f1f246 --- /dev/null +++ b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/InMemoryObjectQueue.java @@ -0,0 +1,127 @@ +package org.xbib.datastructures.queue.tape; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class InMemoryObjectQueue extends ObjectQueue { + + private final Deque entries; + int modCount = 0; + boolean closed; + + public InMemoryObjectQueue() { + entries = new ArrayDeque<>(); + } + + @Override + public QueueFile file() { + return null; + } + + @Override + public void add(T entry) { + if (closed) throw new IllegalStateException("closed"); + modCount++; + entries.addLast(entry); + } + + @Override + public T peek() { + if (closed) throw new IllegalStateException("closed"); + return entries.peekFirst(); + } + + @Override + public List asList() { + return Collections.unmodifiableList(new ArrayList<>(entries)); + } + + @Override + public int size() { + return entries.size(); + } + + @Override + public void remove() { + remove(1); + } + + @Override + public void remove(int n) { + if (closed) throw new IllegalStateException("closed"); + modCount++; + for (int i = 0; i < n; i++) { + entries.removeFirst(); + } + } + + @Override + public Iterator iterator() { + return new EntryIterator(entries.iterator()); + } + + @Override + public void close() { + closed = true; + } + + @Override + public String toString() { + return "InMemoryObjectQueue{" + + "size=" + entries.size() + + '}'; + } + + private final class EntryIterator implements Iterator { + private final Iterator delegate; + private int index = 0; + + private int expectedModCount = modCount; + + EntryIterator(Iterator delegate) { + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + checkForComodification(); + return delegate.hasNext(); + } + + @Override + public T next() { + if (closed) throw new IllegalStateException("closed"); + checkForComodification(); + + T next = delegate.next(); + index += 1; + return next; + } + + @Override + public void remove() { + if (closed) throw new IllegalStateException("closed"); + checkForComodification(); + + if (size() == 0) throw new NoSuchElementException(); + if (index != 1) { + throw new UnsupportedOperationException("Removal is only permitted from the head."); + } + + InMemoryObjectQueue.this.remove(); + + expectedModCount = modCount; + index -= 1; + } + + private void checkForComodification() { + if (modCount != expectedModCount) throw new ConcurrentModificationException(); + } + } +} diff --git a/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/ObjectQueue.java b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/ObjectQueue.java new file mode 100644 index 0000000..44b4f39 --- /dev/null +++ b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/ObjectQueue.java @@ -0,0 +1,63 @@ +package org.xbib.datastructures.queue.tape; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public abstract class ObjectQueue implements Iterable, Closeable { + + public static ObjectQueue create(QueueFile qf, Converter converter) { + return new FileObjectQueue<>(qf, converter); + } + + public static ObjectQueue createInMemory() { + return new InMemoryObjectQueue<>(); + } + + public abstract QueueFile file(); + + public abstract int size(); + + public boolean isEmpty() { + return size() == 0; + } + + public abstract void add(T entry) throws IOException; + + public abstract T peek() throws IOException; + + public List peek(int max) { + int end = Math.min(max, size()); + List subList = new ArrayList<>(end); + Iterator iterator = iterator(); + for (int i = 0; i < end; i++) { + subList.add(iterator.next()); + } + return Collections.unmodifiableList(subList); + } + + public List asList() { + return peek(size()); + } + + public void remove() throws IOException { + remove(1); + } + + public abstract void remove(int n) throws IOException; + + public void clear() throws IOException { + remove(size()); + } + + public interface Converter { + + T from(byte[] source) throws IOException; + + void toStream(T value, OutputStream sink) throws IOException; + } +} diff --git a/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/QueueFile.java b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/QueueFile.java new file mode 100644 index 0000000..5dcd685 --- /dev/null +++ b/datastructures-queue-tape/src/main/java/org/xbib/datastructures/queue/tape/QueueFile.java @@ -0,0 +1,544 @@ +package org.xbib.datastructures.queue.tape; + +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static java.lang.Math.min; + +public final class QueueFile implements Closeable, Iterable { + + private static final int VERSIONED_HEADER = 0x80000001; + + public static final int INITIAL_LENGTH = 4096; // one file system block + + private static final byte[] ZEROES = new byte[INITIAL_LENGTH]; + + public final RandomAccessFile raf; + + final File file; + + final boolean versioned; + + final int headerLength; + + public long fileLength; + + int elementCount; + + Element first; + + private Element last; + + private final byte[] buffer = new byte[32]; + + int modCount = 0; + + private final boolean zero; + + boolean closed; + + static RandomAccessFile initializeFromFile(File file, boolean forceLegacy) throws IOException { + if (!file.exists()) { + File tempFile = new File(file.getPath() + ".tmp"); + try (RandomAccessFile raf = open(tempFile)) { + raf.setLength(INITIAL_LENGTH); + raf.seek(0); + if (forceLegacy) { + raf.writeInt(INITIAL_LENGTH); + } else { + raf.writeInt(VERSIONED_HEADER); + raf.writeLong(INITIAL_LENGTH); + } + } + if (!tempFile.renameTo(file)) { + throw new IOException("Rename failed!"); + } + } + return open(file); + } + + private static RandomAccessFile open(File file) throws FileNotFoundException { + return new RandomAccessFile(file, "rwd"); + } + + public QueueFile(File file, RandomAccessFile raf, boolean zero, boolean forceLegacy) throws IOException { + this.file = file; + this.raf = raf; + this.zero = zero; + raf.seek(0); + raf.readFully(buffer); + versioned = !forceLegacy && (buffer[0] & 0x80) != 0; + long firstOffset; + long lastOffset; + if (versioned) { + headerLength = 32; + int version = readInt(buffer, 0) & 0x7FFFFFFF; + if (version != 1) { + throw new IOException("Unable to read version " + version + " format. Supported versions are 1 and legacy."); + } + fileLength = readLong(buffer, 4); + elementCount = readInt(buffer, 12); + firstOffset = readLong(buffer, 16); + lastOffset = readLong(buffer, 24); + } else { + headerLength = 16; + fileLength = readInt(buffer, 0); + elementCount = readInt(buffer, 4); + firstOffset = readInt(buffer, 8); + lastOffset = readInt(buffer, 12); + } + if (fileLength > raf.length()) { + throw new IOException("File is truncated. Expected length: " + fileLength + ", Actual length: " + raf.length()); + } else if (fileLength <= headerLength) { + throw new IOException("File is corrupt; length stored in header (" + fileLength + ") is invalid."); + } + first = readElement(firstOffset); + last = readElement(lastOffset); + } + + private static void writeInt(byte[] buffer, int offset, int value) { + buffer[offset] = (byte) (value >> 24); + buffer[offset + 1] = (byte) (value >> 16); + buffer[offset + 2] = (byte) (value >> 8); + buffer[offset + 3] = (byte) value; + } + + private static int readInt(byte[] buffer, int offset) { + return ((buffer[offset] & 0xff) << 24) + + ((buffer[offset + 1] & 0xff) << 16) + + ((buffer[offset + 2] & 0xff) << 8) + + (buffer[offset + 3] & 0xff); + } + + private static void writeLong(byte[] buffer, int offset, long value) { + buffer[offset] = (byte) (value >> 56); + buffer[offset + 1] = (byte) (value >> 48); + buffer[offset + 2] = (byte) (value >> 40); + buffer[offset + 3] = (byte) (value >> 32); + buffer[offset + 4] = (byte) (value >> 24); + buffer[offset + 5] = (byte) (value >> 16); + buffer[offset + 6] = (byte) (value >> 8); + buffer[offset + 7] = (byte) value; + } + + private static long readLong(byte[] buffer, int offset) { + return ((buffer[offset] & 0xffL) << 56) + + ((buffer[offset + 1] & 0xffL) << 48) + + ((buffer[offset + 2] & 0xffL) << 40) + + ((buffer[offset + 3] & 0xffL) << 32) + + ((buffer[offset + 4] & 0xffL) << 24) + + ((buffer[offset + 5] & 0xffL) << 16) + + ((buffer[offset + 6] & 0xffL) << 8) + + (buffer[offset + 7] & 0xffL); + } + + private void writeHeader(long fileLength, int elementCount, long firstPosition, long lastPosition) + throws IOException { + raf.seek(0); + + if (versioned) { + writeInt(buffer, 0, VERSIONED_HEADER); + writeLong(buffer, 4, fileLength); + writeInt(buffer, 12, elementCount); + writeLong(buffer, 16, firstPosition); + writeLong(buffer, 24, lastPosition); + raf.write(buffer, 0, 32); + return; + } + + // Legacy queue header. + writeInt(buffer, 0, (int) fileLength); // Signed, so leading bit is always 0 aka legacy. + writeInt(buffer, 4, elementCount); + writeInt(buffer, 8, (int) firstPosition); + writeInt(buffer, 12, (int) lastPosition); + raf.write(buffer, 0, 16); + } + + Element readElement(long position) throws IOException { + if (position == 0) return Element.NULL; + ringRead(position, buffer, 0, Element.HEADER_LENGTH); + int length = readInt(buffer, 0); + return new Element(position, length); + } + + long wrapPosition(long position) { + return position < fileLength ? position + : headerLength + position - fileLength; + } + + private void ringWrite(long position, byte[] buffer, int offset, int count) throws IOException { + position = wrapPosition(position); + if (position + count <= fileLength) { + raf.seek(position); + raf.write(buffer, offset, count); + } else { + int beforeEof = (int) (fileLength - position); + raf.seek(position); + raf.write(buffer, offset, beforeEof); + raf.seek(headerLength); + raf.write(buffer, offset + beforeEof, count - beforeEof); + } + } + + private void ringErase(long position, long length) throws IOException { + while (length > 0) { + int chunk = (int) min(length, ZEROES.length); + ringWrite(position, ZEROES, 0, chunk); + length -= chunk; + position += chunk; + } + } + + void ringRead(long position, byte[] buffer, int offset, int count) throws IOException { + position = wrapPosition(position); + if (position + count <= fileLength) { + raf.seek(position); + raf.readFully(buffer, offset, count); + } else { + int beforeEof = (int) (fileLength - position); + raf.seek(position); + raf.readFully(buffer, offset, beforeEof); + raf.seek(headerLength); + raf.readFully(buffer, offset + beforeEof, count - beforeEof); + } + } + + public void add(byte[] data) throws IOException { + add(data, 0, data.length); + } + + public void add(byte[] data, int offset, int count) throws IOException { + if (data == null) { + throw new NullPointerException("data == null"); + } + if ((offset | count) < 0 || count > data.length - offset) { + throw new IndexOutOfBoundsException(); + } + if (closed) throw new IllegalStateException("closed"); + expandIfNecessary(count); + boolean wasEmpty = isEmpty(); + long position = wasEmpty ? headerLength : wrapPosition(last.position + Element.HEADER_LENGTH + last.length); + Element newLast = new Element(position, count); + writeInt(buffer, 0, count); + ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH); + ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count); + long firstPosition = wasEmpty ? newLast.position : first.position; + writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position); + last = newLast; + elementCount++; + modCount++; + if (wasEmpty) first = last; + } + + private long usedBytes() { + if (elementCount == 0) return headerLength; + if (last.position >= first.position) { + return (last.position - first.position) + + Element.HEADER_LENGTH + last.length + + headerLength; + } else { + return last.position + + Element.HEADER_LENGTH + last.length + + fileLength - first.position; + } + } + + private long remainingBytes() { + return fileLength - usedBytes(); + } + + public boolean isEmpty() { + return elementCount == 0; + } + + /** + * If necessary, expands the file to accommodate an additional element of the given length. + * + * @param dataLength length of data being added + */ + private void expandIfNecessary(long dataLength) throws IOException { + long elementLength = Element.HEADER_LENGTH + dataLength; + long remainingBytes = remainingBytes(); + if (remainingBytes >= elementLength) return; + + // Expand. + long previousLength = fileLength; + long newLength; + // Double the length until we can fit the new data. + do { + remainingBytes += previousLength; + newLength = previousLength << 1; + previousLength = newLength; + } while (remainingBytes < elementLength); + + setLength(newLength); + + // Calculate the position of the tail end of the data in the ring buffer + long endOfLastElement = wrapPosition(last.position + Element.HEADER_LENGTH + last.length); + long count = 0; + // If the buffer is split, we need to make it contiguous + if (endOfLastElement <= first.position) { + FileChannel channel = raf.getChannel(); + channel.position(fileLength); // destination position + count = endOfLastElement - headerLength; + if (channel.transferTo(headerLength, count, channel) != count) { + throw new AssertionError("Copied insufficient number of bytes!"); + } + } + + // Commit the expansion. + if (last.position < first.position) { + long newLastPosition = fileLength + last.position - headerLength; + writeHeader(newLength, elementCount, first.position, newLastPosition); + last = new Element(newLastPosition, last.length); + } else { + writeHeader(newLength, elementCount, first.position, last.position); + } + + fileLength = newLength; + + if (zero) { + ringErase(headerLength, count); + } + } + + /** + * Sets the length of the file. + */ + private void setLength(long newLength) throws IOException { + // Set new file length (considered metadata) and sync it to storage. + raf.setLength(newLength); + raf.getChannel().force(true); + } + + /** + * Reads the eldest element. Returns null if the queue is empty. + */ + public byte[] peek() throws IOException { + if (closed) throw new IllegalStateException("closed"); + if (isEmpty()) return null; + int length = first.length; + byte[] data = new byte[length]; + ringRead(first.position + Element.HEADER_LENGTH, data, 0, length); + return data; + } + + /** + * Returns an iterator over elements in this QueueFile. + * + *

The iterator disallows modifications to be made to the QueueFile during iteration. Removing + * elements from the head of the QueueFile is permitted during iteration using + * {@link Iterator#remove()}. + * + *

The iterator may throw an unchecked {@link IOException} during {@link Iterator#next()} + * or {@link Iterator#remove()}. + */ + @Override + public Iterator iterator() { + return new ElementIterator(); + } + + private final class ElementIterator implements Iterator { + int nextElementIndex = 0; + private long nextElementPosition = first.position; + int expectedModCount = modCount; + + ElementIterator() { + } + + private void checkForComodification() { + if (modCount != expectedModCount) throw new ConcurrentModificationException(); + } + + @Override + public boolean hasNext() { + if (closed) throw new IllegalStateException("closed"); + checkForComodification(); + return nextElementIndex != elementCount; + } + + @Override + public byte[] next() { + if (closed) throw new IllegalStateException("closed"); + checkForComodification(); + if (isEmpty()) throw new NoSuchElementException(); + if (nextElementIndex >= elementCount) throw new NoSuchElementException(); + try { + Element current = readElement(nextElementPosition); + byte[] buffer = new byte[current.length]; + nextElementPosition = wrapPosition(current.position + Element.HEADER_LENGTH); + ringRead(nextElementPosition, buffer, 0, current.length); + nextElementPosition = wrapPosition(current.position + Element.HEADER_LENGTH + current.length); + nextElementIndex++; + return buffer; + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } + } + + @Override + public void remove() { + checkForComodification(); + if (isEmpty()) throw new NoSuchElementException(); + if (nextElementIndex != 1) { + throw new UnsupportedOperationException("Removal is only permitted from the head."); + } + try { + QueueFile.this.remove(); + } catch (IOException e) { + throw QueueFile.getSneakyThrowable(e); + } + expectedModCount = modCount; + nextElementIndex--; + } + } + + public int size() { + return elementCount; + } + + public void remove() throws IOException { + remove(1); + } + + public void remove(int n) throws IOException { + if (n < 0) { + throw new IllegalArgumentException("Cannot remove negative (" + n + ") number of elements."); + } + if (n == 0) { + return; + } + if (n == elementCount) { + clear(); + return; + } + if (isEmpty()) { + throw new NoSuchElementException(); + } + if (n > elementCount) { + throw new IllegalArgumentException("Cannot remove more elements (" + n + ") than present in queue (" + elementCount + ")."); + } + long eraseStartPosition = first.position; + long eraseTotalLength = 0; + long newFirstPosition = first.position; + int newFirstLength = first.length; + for (int i = 0; i < n; i++) { + eraseTotalLength += Element.HEADER_LENGTH + newFirstLength; + newFirstPosition = wrapPosition(newFirstPosition + Element.HEADER_LENGTH + newFirstLength); + ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH); + newFirstLength = readInt(buffer, 0); + } + writeHeader(fileLength, elementCount - n, newFirstPosition, last.position); + elementCount -= n; + modCount++; + first = new Element(newFirstPosition, newFirstLength); + if (zero) { + ringErase(eraseStartPosition, eraseTotalLength); + } + } + + public void clear() throws IOException { + if (closed) throw new IllegalStateException("closed"); + writeHeader(INITIAL_LENGTH, 0, 0, 0); + if (zero) { + raf.seek(headerLength); + raf.write(ZEROES, 0, INITIAL_LENGTH - headerLength); + } + elementCount = 0; + first = Element.NULL; + last = Element.NULL; + if (fileLength > INITIAL_LENGTH) setLength(INITIAL_LENGTH); + fileLength = INITIAL_LENGTH; + modCount++; + } + + public File file() { + return file; + } + + @Override + public void close() throws IOException { + closed = true; + raf.close(); + } + + @Override + public String toString() { + return "QueueFile{" + + "file=" + file + + ", zero=" + zero + + ", versioned=" + versioned + + ", length=" + fileLength + + ", size=" + elementCount + + ", first=" + first + + ", last=" + last + + '}'; + } + + public static class Element { + static final Element NULL = new Element(0, 0); + public static final int HEADER_LENGTH = 4; + final long position; + final int length; + public Element(long position, int length) { + this.position = position; + this.length = length; + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "[position=" + position + + ", length=" + length + + "]"; + } + } + + public static final class Builder { + final File file; + boolean zero = true; + boolean forceLegacy = false; + + public Builder(File file) { + if (file == null) { + throw new NullPointerException("file == null"); + } + this.file = file; + } + + public Builder zero(boolean zero) { + this.zero = zero; + return this; + } + + public Builder forceLegacy(boolean forceLegacy) { + this.forceLegacy = forceLegacy; + return this; + } + + public QueueFile build() throws IOException { + RandomAccessFile raf = initializeFromFile(file, forceLegacy); + QueueFile qf = null; + try { + qf = new QueueFile(file, raf, zero, forceLegacy); + return qf; + } finally { + if (qf == null) { + raf.close(); + } + } + } + } + + @SuppressWarnings({"unchecked", "TypeParameterUnusedInFormals"}) + static T getSneakyThrowable(Throwable t) throws T { + throw (T) t; + } +} diff --git a/datastructures-queue-tape/src/test/java/com/squareup/tape2/ObjectQueueTest.java b/datastructures-queue-tape/src/test/java/com/squareup/tape2/ObjectQueueTest.java new file mode 100644 index 0000000..8410a73 --- /dev/null +++ b/datastructures-queue-tape/src/test/java/com/squareup/tape2/ObjectQueueTest.java @@ -0,0 +1,231 @@ +package com.squareup.tape2; + +import com.squareup.burst.BurstJUnit4; +import com.squareup.burst.annotation.Burst; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.xbib.datastructures.queue.tape.FileObjectQueue; +import org.xbib.datastructures.queue.tape.ObjectQueue; +import org.xbib.datastructures.queue.tape.QueueFile; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +@RunWith(BurstJUnit4.class) +public class ObjectQueueTest { + public enum QueueFactory { + FILE() { + @Override + public ObjectQueue create(QueueFile queueFile, FileObjectQueue.Converter converter) + throws IOException { + return ObjectQueue.create(queueFile, converter); + } + }, + MEMORY() { + @Override + public ObjectQueue create(QueueFile file, FileObjectQueue.Converter converter) { + return ObjectQueue.createInMemory(); + } + }; + + public abstract ObjectQueue create(QueueFile queueFile, + FileObjectQueue.Converter converter) throws IOException; + } + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + @Burst QueueFactory factory; + ObjectQueue queue; + + @Before public void setUp() throws IOException { + File parent = folder.getRoot(); + File file = new File(parent, "object-queue"); + QueueFile queueFile = new QueueFile.Builder(file).build(); + + queue = factory.create(queueFile, new StringConverter()); + queue.add("one"); + queue.add("two"); + queue.add("three"); + } + + @Test public void size() throws IOException { + assertThat(queue.size()).isEqualTo(3); + } + + @Test public void peek() throws IOException { + assertThat(queue.peek()).isEqualTo("one"); + } + + @Test public void peekMultiple() throws IOException { + assertThat(queue.peek(2)).containsExactly("one", "two"); + } + + @Test public void peekMaxCanExceedQueueDepth() throws IOException { + assertThat(queue.peek(6)).containsExactly("one", "two", "three"); + } + + @Test public void asList() throws IOException { + assertThat(queue.asList()).containsExactly("one", "two", "three"); + } + + @Test public void remove() throws IOException { + queue.remove(); + + assertThat(queue.asList()).containsExactly("two", "three"); + } + + @Test public void removeMultiple() throws IOException { + queue.remove(2); + + assertThat(queue.asList()).containsExactly("three"); + } + + @Test public void clear() throws IOException { + queue.clear(); + + assertThat(queue.size()).isEqualTo(0); + } + + @Test public void isEmpty() throws IOException { + assertThat(queue.isEmpty()).isFalse(); + + queue.clear(); + + assertThat(queue.isEmpty()).isTrue(); + } + + @Test public void testIterator() throws IOException { + final List saw = new ArrayList<>(); + for (String pojo : queue) { + saw.add(pojo); + } + assertThat(saw).containsExactly("one", "two", "three"); + } + + @Test public void testIteratorNextThrowsWhenEmpty() throws IOException { + queue.clear(); + Iterator iterator = queue.iterator(); + + try { + iterator.next(); + fail(); + } catch (NoSuchElementException ignored) { + } + } + + @Test public void testIteratorNextThrowsWhenExhausted() throws IOException { + Iterator iterator = queue.iterator(); + iterator.next(); + iterator.next(); + iterator.next(); + + try { + iterator.next(); + fail(); + } catch (NoSuchElementException ignored) { + } + } + + @Test public void testIteratorRemove() throws IOException { + Iterator iterator = queue.iterator(); + + iterator.next(); + iterator.remove(); + assertThat(queue.asList()).containsExactly("two", "three"); + + iterator.next(); + iterator.remove(); + assertThat(queue.asList()).containsExactly("three"); + } + + @Test public void testIteratorRemoveDisallowsConcurrentModification() throws IOException { + Iterator iterator = queue.iterator(); + iterator.next(); + queue.remove(); + + try { + iterator.remove(); + fail(); + } catch (ConcurrentModificationException ignored) { + } + } + + @Test public void testIteratorHasNextDisallowsConcurrentModification() throws IOException { + Iterator iterator = queue.iterator(); + iterator.next(); + queue.remove(); + + try { + iterator.hasNext(); + fail(); + } catch (ConcurrentModificationException ignored) { + } + } + + @Test public void testIteratorDisallowsConcurrentModificationWithClear() throws IOException { + Iterator iterator = queue.iterator(); + iterator.next(); + queue.clear(); + + try { + iterator.hasNext(); + fail(); + } catch (ConcurrentModificationException ignored) { + } + } + + @Test public void testIteratorOnlyRemovesFromHead() throws IOException { + Iterator iterator = queue.iterator(); + iterator.next(); + iterator.next(); + + try { + iterator.remove(); + fail(); + } catch (UnsupportedOperationException ex) { + assertThat(ex).hasMessageThat().isEqualTo("Removal is only permitted from the head."); + } + } + + @Test public void iteratorThrowsIOException() throws IOException { + File parent = folder.getRoot(); + File file = new File(parent, "object-queue"); + QueueFile queueFile = new QueueFile.Builder(file).build(); + ObjectQueue queue = ObjectQueue.create(queueFile, new ObjectQueue.Converter() { + @Override public String from(byte[] bytes) throws IOException { + throw new IOException(); + } + + @Override public void toStream(Object o, OutputStream bytes) { + } + }); + queue.add(new Object()); + Iterator iterator = queue.iterator(); + try { + iterator.next(); + fail(); + } catch (Exception ioe) { + assertThat(ioe).isInstanceOf(IOException.class); + } + } + + static class StringConverter implements FileObjectQueue.Converter { + @Override public String from(byte[] bytes) throws IOException { + return new String(bytes, "UTF-8"); + } + + @Override public void toStream(String s, OutputStream os) throws IOException { + os.write(s.getBytes("UTF-8")); + } + } +} diff --git a/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileLoadingTest.java b/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileLoadingTest.java new file mode 100644 index 0000000..a755fe0 --- /dev/null +++ b/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileLoadingTest.java @@ -0,0 +1,103 @@ +package com.squareup.tape2; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.xbib.datastructures.queue.tape.FileObjectQueue; +import org.xbib.datastructures.queue.tape.QueueFile; + +import static com.squareup.tape2.QueueTestUtils.EMPTY_SERIALIZED_QUEUE; +import static com.squareup.tape2.QueueTestUtils.FRESH_SERIALIZED_QUEUE; +import static com.squareup.tape2.QueueTestUtils.ONE_ENTRY_SERIALIZED_QUEUE; +import static com.squareup.tape2.QueueTestUtils.TRUNCATED_EMPTY_SERIALIZED_QUEUE; +import static com.squareup.tape2.QueueTestUtils.TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE; +import static com.squareup.tape2.QueueTestUtils.copyTestFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(JUnit4.class) +public final class QueueFileLoadingTest { + + private File testFile; + + @After public void cleanup() { + assertTrue("Failed to delete test file " + testFile.getPath(), testFile.delete()); + } + + @Test public void testMissingFileInitializes() throws Exception { + testFile = File.createTempFile(FRESH_SERIALIZED_QUEUE, "test"); + assertTrue(testFile.delete()); + assertFalse(testFile.exists()); + QueueFile queue = new QueueFile.Builder(testFile).build(); + assertEquals(0, queue.size()); + assertTrue(testFile.exists()); + queue.close(); + } + + @Test public void testEmptyFileInitializes() throws Exception { + testFile = copyTestFile(EMPTY_SERIALIZED_QUEUE); + QueueFile queue = new QueueFile.Builder(testFile).build(); + assertEquals(0, queue.size()); + queue.close(); + } + + @Test public void testSingleEntryFileInitializes() throws Exception { + testFile = copyTestFile(ONE_ENTRY_SERIALIZED_QUEUE); + QueueFile queue = new QueueFile.Builder(testFile).build(); + assertEquals(1, queue.size()); + queue.close(); + } + + @Test(expected = IOException.class) + public void testTruncatedEmptyFileThrows() throws Exception { + testFile = copyTestFile(TRUNCATED_EMPTY_SERIALIZED_QUEUE); + new QueueFile.Builder(testFile).build(); + } + + @Test(expected = IOException.class) + public void testTruncatedOneEntryFileThrows() throws Exception { + testFile = copyTestFile(TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE); + new QueueFile.Builder(testFile).build(); + } + + @Test(expected = IOException.class) + public void testCreateWithReadOnlyFileThrowsException() throws Exception { + testFile = copyTestFile(TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE); + assertTrue(testFile.setWritable(false)); + + // Should throw an exception. + new QueueFile.Builder(testFile).build(); + } + + @Test(expected = IOException.class) + public void testAddWithReadOnlyFileMissesMonitor() throws Exception { + testFile = copyTestFile(EMPTY_SERIALIZED_QUEUE); + + QueueFile qf = new QueueFile.Builder(testFile).build(); + + // Should throw an exception. + FileObjectQueue queue = + new FileObjectQueue<>(qf, new FileObjectQueue.Converter() { + @Override public String from(byte[] bytes) throws IOException { + return null; + } + + @Override public void toStream(String o, OutputStream bytes) + throws IOException { + throw new IOException("fake Permission denied"); + } + }); + + // Should throw an exception. + try { + queue.add("trouble"); + } finally { + queue.close(); + } + } +} diff --git a/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileTest.java b/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileTest.java new file mode 100644 index 0000000..c7274f3 --- /dev/null +++ b/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueFileTest.java @@ -0,0 +1,994 @@ +package com.squareup.tape2; + +import org.xbib.datastructures.queue.tape.QueueFile; +import org.xbib.datastructures.queue.tape.QueueFile.Element; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.logging.Logger; +import okio.BufferedSource; +import okio.Okio; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static com.google.common.truth.Truth.assertThat; +import static org.xbib.datastructures.queue.tape.QueueFile.INITIAL_LENGTH; +import static org.junit.Assert.fail; + +/** + * Tests for QueueFile. + * + * @author Bob Lee (bob@squareup.com) + */ +@SuppressWarnings("ResultOfMethodCallIgnored") +@RunWith(Parameterized.class) +public class QueueFileTest { + @Parameterized.Parameters(name = "{0}") + public static List parameters() { + return Arrays.asList( + new Object[] {"Legacy" , true , 16}, + new Object[] {"Versioned", false, 32} + ); + } + + private static final Logger logger = Logger.getLogger(QueueFileTest.class.getName()); + + /** + * Takes up 33401 bytes in the queue (N*(N+1)/2+4*N). Picked 254 instead of 255 so that the number + * of bytes isn't a multiple of 4. + */ + private static final int N = 254; + private static byte[][] values = new byte[N][]; + + static { + for (int i = 0; i < N; i++) { + byte[] value = new byte[i]; + // Example: values[3] = { 3, 2, 1 } + for (int ii = 0; ii < i; ii++) + value[ii] = (byte) (i - ii); + values[i] = value; + } + } + + private final boolean forceLegacy; + private final int headerLength; + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + private File file; + + public QueueFileTest(String name, boolean forceLegacy, int headerLength) { + this.forceLegacy = forceLegacy; + this.headerLength = headerLength; + } + + private QueueFile newQueueFile() throws IOException { + return newQueueFile(true); + } + + private QueueFile newQueueFile(RandomAccessFile raf) throws IOException { + return new QueueFile(this.file, raf, true, forceLegacy); + } + + private QueueFile newQueueFile(boolean zero) throws IOException { + return new QueueFile.Builder(file).zero(zero).forceLegacy(forceLegacy).build(); + } + + @Before public void setUp() throws Exception { + File parent = folder.getRoot(); + file = new File(parent, "queue-file"); + } + + @Test public void testAddOneElement() throws IOException { + // This test ensures that we update 'first' correctly. + QueueFile queue = newQueueFile(); + byte[] expected = values[253]; + queue.add(expected); + assertThat(queue.peek()).isEqualTo(expected); + queue.close(); + queue = newQueueFile(); + assertThat(queue.peek()).isEqualTo(expected); + } + + @Test public void testClearErases() throws IOException { + QueueFile queue = newQueueFile(); + byte[] expected = values[253]; + queue.add(expected); + + // Confirm that the data was in the file before we cleared. + byte[] data = new byte[expected.length]; + queue.raf.seek(headerLength + Element.HEADER_LENGTH); + queue.raf.readFully(data, 0, expected.length); + assertThat(data).isEqualTo(expected); + + queue.clear(); + + // Should have been erased. + queue.raf.seek(headerLength + Element.HEADER_LENGTH); + queue.raf.readFully(data, 0, expected.length); + assertThat(data).isEqualTo(new byte[expected.length]); + } + + @Test public void testClearDoesNotCorrupt() throws IOException { + QueueFile queue = newQueueFile(); + byte[] stuff = values[253]; + queue.add(stuff); + queue.clear(); + + queue = newQueueFile(); + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.peek()).isNull(); + + queue.add(values[25]); + assertThat(queue.peek()).isEqualTo(values[25]); + } + + @Test public void removeErasesEagerly() throws IOException { + QueueFile queue = newQueueFile(); + + byte[] firstStuff = values[127]; + queue.add(firstStuff); + + byte[] secondStuff = values[253]; + queue.add(secondStuff); + + // Confirm that first stuff was in the file before we remove. + byte[] data = new byte[firstStuff.length]; + queue.raf.seek(headerLength + Element.HEADER_LENGTH); + queue.raf.readFully(data, 0, firstStuff.length); + assertThat(data).isEqualTo(firstStuff); + + queue.remove(); + + // Next record is intact + assertThat(queue.peek()).isEqualTo(secondStuff); + + // First should have been erased. + queue.raf.seek(headerLength + Element.HEADER_LENGTH); + queue.raf.readFully(data, 0, firstStuff.length); + assertThat(data).isEqualTo(new byte[firstStuff.length]); + } + + @Test public void testZeroSizeInHeaderThrows() throws IOException { + RandomAccessFile emptyFile = new RandomAccessFile(file, "rwd"); + emptyFile.setLength(INITIAL_LENGTH); + emptyFile.getChannel().force(true); + emptyFile.close(); + + try { + newQueueFile(); + fail("Should have thrown about bad header length"); + } catch (IOException ex) { + assertThat(ex).hasMessageThat() + .isEqualTo("File is corrupt; length stored in header (0) is invalid."); + } + } + + @Test public void testSizeLessThanHeaderThrows() throws IOException { + RandomAccessFile emptyFile = new RandomAccessFile(file, "rwd"); + emptyFile.setLength(INITIAL_LENGTH); + if (forceLegacy) { + emptyFile.writeInt(headerLength - 1); + } else { + emptyFile.writeInt(0x80000001); + emptyFile.writeLong(headerLength - 1); + } + emptyFile.getChannel().force(true); + emptyFile.close(); + + try { + newQueueFile(); + fail(); + } catch (IOException ex) { + assertThat(ex.getMessage()).isIn( + Arrays.asList("File is corrupt; length stored in header (15) is invalid.", + "File is corrupt; length stored in header (31) is invalid.")); + } + } + + @Test public void testNegativeSizeInHeaderThrows() throws IOException { + RandomAccessFile emptyFile = new RandomAccessFile(file, "rwd"); + emptyFile.seek(0); + emptyFile.writeInt(-2147483648); + emptyFile.setLength(INITIAL_LENGTH); + emptyFile.getChannel().force(true); + emptyFile.close(); + + try { + newQueueFile(); + fail("Should have thrown about bad header length"); + } catch (IOException ex) { + assertThat(ex.getMessage()).isIn( + Arrays.asList("File is corrupt; length stored in header (-2147483648) is invalid.", + "Unable to read version 0 format. Supported versions are 1 and legacy.")); + } + } + + @Test public void removeMultipleDoesNotCorrupt() throws IOException { + QueueFile queue = newQueueFile(); + for (int i = 0; i < 10; i++) { + queue.add(values[i]); + } + + queue.remove(1); + assertThat(queue.size()).isEqualTo(9); + assertThat(queue.peek()).isEqualTo(values[1]); + + queue.remove(3); + queue = newQueueFile(); + assertThat(queue.size()).isEqualTo(6); + assertThat(queue.peek()).isEqualTo(values[4]); + + queue.remove(6); + assertThat(queue.isEmpty()).isTrue(); + assertThat(queue.peek()).isNull(); + } + + @Test public void removeDoesNotCorrupt() throws IOException { + QueueFile queue = newQueueFile(); + + queue.add(values[127]); + byte[] secondStuff = values[253]; + queue.add(secondStuff); + queue.remove(); + + queue = newQueueFile(); + assertThat(queue.peek()).isEqualTo(secondStuff); + } + + @Test public void removeFromEmptyFileThrows() throws IOException { + QueueFile queue = newQueueFile(); + + try { + queue.remove(); + fail("Should have thrown about removing from empty file."); + } catch (NoSuchElementException ignored) { + } + } + + @Test public void removeZeroFromEmptyFileDoesNothing() throws IOException { + QueueFile queue = newQueueFile(); + queue.remove(0); + assertThat(queue.isEmpty()).isTrue(); + } + + @Test public void removeNegativeNumberOfElementsThrows() throws IOException { + QueueFile queue = newQueueFile(); + queue.add(values[127]); + + try { + queue.remove(-1); + fail("Should have thrown about removing negative number of elements."); + } catch (IllegalArgumentException ex) { + assertThat(ex) // + .hasMessageThat().isEqualTo("Cannot remove negative (-1) number of elements."); + } + } + + @Test public void removeZeroElementsDoesNothing() throws IOException { + QueueFile queue = newQueueFile(); + queue.add(values[127]); + + queue.remove(0); + assertThat(queue.size()).isEqualTo(1); + } + + @Test public void removeBeyondQueueSizeElementsThrows() throws IOException { + QueueFile queue = newQueueFile(); + queue.add(values[127]); + + try { + queue.remove(10); + fail("Should have thrown about removing too many elements."); + } catch (IllegalArgumentException ex) { + assertThat(ex) // + .hasMessageThat() + .isEqualTo("Cannot remove more elements (10) than present in queue (1)."); + } + } + + @Test public void removingBigDamnBlocksErasesEffectively() throws IOException { + byte[] bigBoy = new byte[7000]; + for (int i = 0; i < 7000; i += 100) { + System.arraycopy(values[100], 0, bigBoy, i, values[100].length); + } + + QueueFile queue = newQueueFile(); + + queue.add(bigBoy); + byte[] secondStuff = values[123]; + queue.add(secondStuff); + + // Confirm that bigBoy was in the file before we remove. + byte[] data = new byte[bigBoy.length]; + queue.raf.seek(headerLength + Element.HEADER_LENGTH); + queue.raf.readFully(data, 0, bigBoy.length); + assertThat(data).isEqualTo(bigBoy); + + queue.remove(); + + // Next record is intact + assertThat(queue.peek()).isEqualTo(secondStuff); + + // First should have been erased. + queue.raf.seek(headerLength + Element.HEADER_LENGTH); + queue.raf.readFully(data, 0, bigBoy.length); + assertThat(data).isEqualTo(new byte[bigBoy.length]); + } + + @Test public void testAddAndRemoveElements() throws IOException { + long start = System.nanoTime(); + + Queue expected = new ArrayDeque<>(); + + for (int round = 0; round < 5; round++) { + QueueFile queue = newQueueFile(); + for (int i = 0; i < N; i++) { + queue.add(values[i]); + expected.add(values[i]); + } + + // Leave N elements in round N, 15 total for 5 rounds. Removing all the + // elements would be like starting with an empty queue. + for (int i = 0; i < N - round - 1; i++) { + assertThat(queue.peek()).isEqualTo(expected.remove()); + queue.remove(); + } + queue.close(); + } + + // Remove and validate remaining 15 elements. + QueueFile queue = newQueueFile(); + assertThat(queue.size()).isEqualTo(15); + assertThat(queue.size()).isEqualTo(expected.size()); + while (!expected.isEmpty()) { + assertThat(queue.peek()).isEqualTo(expected.remove()); + queue.remove(); + } + queue.close(); + + // length() returns 0, but I checked the size w/ 'ls', and it is correct. + // assertEquals(65536, file.length()); + + logger.info("Ran in " + ((System.nanoTime() - start) / 1000000) + "ms."); + } + + /** Tests queue expansion when the data crosses EOF. */ + @Test public void testSplitExpansion() throws IOException { + // This should result in 3560 bytes. + int max = 80; + + Queue expected = new ArrayDeque<>(); + QueueFile queue = newQueueFile(); + + for (int i = 0; i < max; i++) { + expected.add(values[i]); + queue.add(values[i]); + } + + // Remove all but 1. + for (int i = 1; i < max; i++) { + assertThat(queue.peek()).isEqualTo(expected.remove()); + queue.remove(); + } + + // This should wrap around before expanding. + for (int i = 0; i < N; i++) { + expected.add(values[i]); + queue.add(values[i]); + } + + while (!expected.isEmpty()) { + assertThat(queue.peek()).isEqualTo(expected.remove()); + queue.remove(); + } + + queue.close(); + } + + /** Tests failed queue expansion when the data crosses EOF. */ + @Test public void testFailedSplitExpansion() throws IOException { + // This should results in a full file, but doesn't trigger an expansion (yet) + int max = 86; + + Queue expected = new ArrayDeque<>(); + QueueFile queue = newQueueFile(); + + for (int i = 0; i < max; i++) { + expected.add(values[i]); + queue.add(values[i]); + } + + // Remove all but 1 value and add back + // This should wrap around before expanding. + for (int i = 0; i < max - 1; i++) { + assertThat(queue.peek()).isEqualTo(expected.remove()); + queue.remove(); + + expected.add(values[i]); + queue.add(values[i]); + } + + //Try to insert element that causes file expansion, but fail + long fileLengthBeforeExpansion = file.length(); + BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd"); + queue = newQueueFile(braf); + try { + queue.add(values[max]); + fail(); + } catch (IOException e) { /* expected */ } + + //Check that the queue continues valid + braf.rejectCommit = false; + while (!expected.isEmpty()) { + assertThat(queue.peek()).isEqualTo(expected.remove()); + queue.remove(); + } + + queue.close(); + } + + @Test public void testFailedAdd() throws IOException { + QueueFile queueFile = newQueueFile(); + queueFile.add(values[253]); + queueFile.close(); + + BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd"); + queueFile = newQueueFile(braf); + + try { + queueFile.add(values[252]); + fail(); + } catch (IOException e) { /* expected */ } + + braf.rejectCommit = false; + + // Allow a subsequent add to succeed. + queueFile.add(values[251]); + + queueFile.close(); + + queueFile = newQueueFile(); + assertThat(queueFile.size()).isEqualTo(2); + assertThat(queueFile.peek()).isEqualTo(values[253]); + queueFile.remove(); + assertThat(queueFile.peek()).isEqualTo(values[251]); + } + + @Test public void testFailedRemoval() throws IOException { + QueueFile queueFile = newQueueFile(); + queueFile.add(values[253]); + queueFile.close(); + + BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd"); + queueFile = newQueueFile(braf); + + try { + queueFile.remove(); + fail(); + } catch (IOException e) { /* expected */ } + + queueFile.close(); + + queueFile = newQueueFile(); + assertThat(queueFile.size()).isEqualTo(1); + assertThat(queueFile.peek()).isEqualTo(values[253]); + + queueFile.add(values[99]); + queueFile.remove(); + assertThat(queueFile.peek()).isEqualTo(values[99]); + } + + @Test public void testFailedExpansion() throws IOException { + QueueFile queueFile = newQueueFile(); + queueFile.add(values[253]); + queueFile.close(); + + BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd"); + queueFile = newQueueFile(braf); + + try { + // This should trigger an expansion which should fail. + queueFile.add(new byte[8000]); + fail(); + } catch (IOException e) { /* expected */ } + + queueFile.close(); + + queueFile = newQueueFile(); + assertThat(queueFile.size()).isEqualTo(1); + assertThat(queueFile.peek()).isEqualTo(values[253]); + assertThat(queueFile.fileLength).isEqualTo(4096); + + queueFile.add(values[99]); + queueFile.remove(); + assertThat(queueFile.peek()).isEqualTo(values[99]); + } + + /** + * Exercise a bug where wrapped elements were getting corrupted when the + * QueueFile was forced to expand in size and a portion of the final Element + * had been wrapped into space at the beginning of the file. + */ + @Test public void testFileExpansionDoesntCorruptWrappedElements() + throws IOException { + QueueFile queue = newQueueFile(); + + // Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5. + byte[][] values = new byte[5][]; + for (int blockNum = 0; blockNum < values.length; blockNum++) { + values[blockNum] = new byte[1024]; + for (int i = 0; i < values[blockNum].length; i++) { + values[blockNum][i] = (byte) (blockNum + 1); + } + } + + // First, add the first two blocks to the queue, remove one leaving a + // 1K space at the beginning of the buffer. + queue.add(values[0]); + queue.add(values[1]); + queue.remove(); + + // The trailing end of block "4" will be wrapped to the start of the buffer. + queue.add(values[2]); + queue.add(values[3]); + + // Cause buffer to expand as there isn't space between the end of block "4" + // and the start of block "2". Internally the queue should cause block "4" + // to be contiguous, but there was a bug where that wasn't happening. + queue.add(values[4]); + + // Make sure values are not corrupted, specifically block "4" that wasn't + // being made contiguous in the version with the bug. + for (int blockNum = 1; blockNum < values.length; blockNum++) { + byte[] value = queue.peek(); + queue.remove(); + + for (int i = 0; i < value.length; i++) { + assertThat(value[i]).named("Block %1$d corrupted at byte index %2$d.", blockNum + 1, i) + .isEqualTo((byte) (blockNum + 1)); + } + } + + queue.close(); + } + + /** + * Exercise a bug where wrapped elements were getting corrupted when the + * QueueFile was forced to expand in size and a portion of the final Element + * had been wrapped into space at the beginning of the file - if multiple + * Elements have been written to empty buffer space at the start does the + * expansion correctly update all their positions? + */ + @Test public void testFileExpansionCorrectlyMovesElements() throws IOException { + QueueFile queue = newQueueFile(); + + // Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5. + byte[][] values = new byte[5][]; + for (int blockNum = 0; blockNum < values.length; blockNum++) { + values[blockNum] = new byte[1024]; + for (int i = 0; i < values[blockNum].length; i++) { + values[blockNum][i] = (byte) (blockNum + 1); + } + } + + // smaller data elements + byte[][] smaller = new byte[3][]; + for (int blockNum = 0; blockNum < smaller.length; blockNum++) { + smaller[blockNum] = new byte[256]; + for (int i = 0; i < smaller[blockNum].length; i++) { + smaller[blockNum][i] = (byte) (blockNum + 6); + } + } + + // First, add the first two blocks to the queue, remove one leaving a + // 1K space at the beginning of the buffer. + queue.add(values[0]); + queue.add(values[1]); + queue.remove(); + + // The trailing end of block "4" will be wrapped to the start of the buffer. + queue.add(values[2]); + queue.add(values[3]); + + // Now fill in some space with smaller blocks, none of which will cause + // an expansion. + queue.add(smaller[0]); + queue.add(smaller[1]); + queue.add(smaller[2]); + + // Cause buffer to expand as there isn't space between the end of the + // smaller block "8" and the start of block "2". Internally the queue + // should cause all of tbe smaller blocks, and the trailing end of + // block "5" to be moved to the end of the file. + queue.add(values[4]); + + byte[] expectedBlockNumbers = {2, 3, 4, 6, 7, 8, 5}; + + // Make sure values are not corrupted, specifically block "4" that wasn't + // being made contiguous in the version with the bug. + for (byte expectedBlockNumber : expectedBlockNumbers) { + byte[] value = queue.peek(); + queue.remove(); + + for (int i = 0; i < value.length; i++) { + assertThat(value[i]).named("Block %1$d corrupted at byte index %2$d.", expectedBlockNumber, + i).isEqualTo(expectedBlockNumber); + } + } + + queue.close(); + } + + @Test public void removingElementZeroesData() throws IOException { + QueueFile queueFile = newQueueFile(true); + queueFile.add(values[4]); + queueFile.remove(); + queueFile.close(); + + BufferedSource source = Okio.buffer(Okio.source(file)); + source.skip(headerLength); + source.skip(Element.HEADER_LENGTH); + assertThat(source.readByteString(4).hex()).isEqualTo("00000000"); + } + + @Test public void removingElementDoesNotZeroData() throws IOException { + QueueFile queueFile = newQueueFile(false); + queueFile.add(values[4]); + queueFile.remove(); + queueFile.close(); + + BufferedSource source = Okio.buffer(Okio.source(file)); + source.skip(headerLength); + source.skip(Element.HEADER_LENGTH); + assertThat(source.readByteString(4).hex()).isEqualTo("04030201"); + + source.close(); + } + + /** + * Exercise a bug where file expansion would leave garbage at the start of the header + * and after the last element. + */ + @Test public void testFileExpansionCorrectlyZeroesData() + throws IOException { + QueueFile queue = newQueueFile(); + + // Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5. + byte[][] values = new byte[5][]; + for (int blockNum = 0; blockNum < values.length; blockNum++) { + values[blockNum] = new byte[1024]; + for (int i = 0; i < values[blockNum].length; i++) { + values[blockNum][i] = (byte) (blockNum + 1); + } + } + + // First, add the first two blocks to the queue, remove one leaving a + // 1K space at the beginning of the buffer. + queue.add(values[0]); + queue.add(values[1]); + queue.remove(); + + // The trailing end of block "4" will be wrapped to the start of the buffer. + queue.add(values[2]); + queue.add(values[3]); + + // Cause buffer to expand as there isn't space between the end of block "4" + // and the start of block "2". Internally the queue will cause block "4" + // to be contiguous. There was a bug where the start of the buffer still + // contained the tail end of block "4", and garbage was copied after the tail + // end of the last element. + queue.add(values[4]); + + // Read from header to first element and make sure it's zeroed. + int firstElementPadding = Element.HEADER_LENGTH + 1024; + byte[] data = new byte[firstElementPadding]; + queue.raf.seek(headerLength); + queue.raf.readFully(data, 0, firstElementPadding); + assertThat(data).isEqualTo(new byte[firstElementPadding]); + + // Read from the last element to the end and make sure it's zeroed. + int endOfLastElement = headerLength + firstElementPadding + 4 * (Element.HEADER_LENGTH + 1024); + int readLength = (int) (queue.raf.length() - endOfLastElement); + data = new byte[readLength]; + queue.raf.seek(endOfLastElement); + queue.raf.readFully(data, 0, readLength); + assertThat(data).isEqualTo(new byte[readLength]); + } + + /** + * Exercise a bug where an expanding queue file where the start and end positions + * are the same causes corruption. + */ + @Test public void testSaturatedFileExpansionMovesElements() throws IOException { + QueueFile queue = newQueueFile(); + + // Create test data - 1016-byte blocks marked consecutively 1, 2, 3, 4, 5 and 6, + // four of which perfectly fill the queue file, taking into account the file header + // and the item headers. + // Each item is of length + // (QueueFile.INITIAL_LENGTH - headerLength) / 4 - element_header_length + // = 1016 bytes + byte[][] values = new byte[6][]; + for (int blockNum = 0; blockNum < values.length; blockNum++) { + values[blockNum] = new byte[(INITIAL_LENGTH - headerLength) / 4 - Element.HEADER_LENGTH]; + for (int i = 0; i < values[blockNum].length; i++) { + values[blockNum][i] = (byte) (blockNum + 1); + } + } + + // Saturate the queue file + queue.add(values[0]); + queue.add(values[1]); + queue.add(values[2]); + queue.add(values[3]); + + // Remove an element and add a new one so that the position of the start and + // end of the queue are equal + queue.remove(); + queue.add(values[4]); + + // Cause the queue file to expand + queue.add(values[5]); + + // Make sure values are not corrupted + for (int i = 1; i < 6; i++) { + assertThat(queue.peek()).isEqualTo(values[i]); + queue.remove(); + } + + queue.close(); + } + + /** + * Exercise a bug where opening a queue whose first or last element's header + * was non contiguous throws an {@link java.io.EOFException}. + */ + @Test public void testReadHeadersFromNonContiguousQueueWorks() throws IOException { + QueueFile queueFile = newQueueFile(); + + // Fill the queue up to `length - 2` (i.e. remainingBytes() == 2). + for (int i = 0; i < 15; i++) { + queueFile.add(values[N - 1]); + } + queueFile.add(values[219]); + + // Remove first item so we have room to add another one without growing the file. + queueFile.remove(); + + // Add any element element and close the queue. + queueFile.add(values[6]); + int queueSize = queueFile.size(); + queueFile.close(); + + // File should not be corrupted. + QueueFile queueFile2 = newQueueFile(); + assertThat(queueFile2.size()).isEqualTo(queueSize); + } + + @Test public void testIterator() throws IOException { + byte[] data = values[10]; + + for (int i = 0; i < 10; i++) { + QueueFile queueFile = newQueueFile(); + for (int j = 0; j < i; j++) { + queueFile.add(data); + } + + int saw = 0; + for (byte[] element : queueFile) { + assertThat(element).isEqualTo(data); + saw++; + } + assertThat(saw).isEqualTo(i); + queueFile.close(); + file.delete(); + } + } + + @Test public void testIteratorNextThrowsWhenEmpty() throws IOException { + QueueFile queueFile = newQueueFile(); + + Iterator iterator = queueFile.iterator(); + + try { + iterator.next(); + fail(); + } catch (NoSuchElementException ignored) { + } + } + + @Test public void testIteratorNextThrowsWhenExhausted() throws IOException { + QueueFile queueFile = newQueueFile(); + queueFile.add(values[0]); + + Iterator iterator = queueFile.iterator(); + iterator.next(); + + try { + iterator.next(); + fail(); + } catch (NoSuchElementException ignored) { + } + } + + @Test public void testIteratorRemove() throws IOException { + QueueFile queueFile = newQueueFile(); + for (int i = 0; i < 15; i++) { + queueFile.add(values[i]); + } + + Iterator iterator = queueFile.iterator(); + while (iterator.hasNext()) { + iterator.next(); + iterator.remove(); + } + + assertThat(queueFile).isEmpty(); + } + + @Test public void testIteratorRemoveDisallowsConcurrentModification() throws IOException { + QueueFile queueFile = newQueueFile(); + for (int i = 0; i < 15; i++) { + queueFile.add(values[i]); + } + + Iterator iterator = queueFile.iterator(); + iterator.next(); + queueFile.remove(); + try { + iterator.remove(); + fail(); + } catch (ConcurrentModificationException ignored) { + } + } + + @Test public void testIteratorHasNextDisallowsConcurrentModification() throws IOException { + QueueFile queueFile = newQueueFile(); + for (int i = 0; i < 15; i++) { + queueFile.add(values[i]); + } + + Iterator iterator = queueFile.iterator(); + iterator.next(); + queueFile.remove(); + try { + iterator.hasNext(); + fail(); + } catch (ConcurrentModificationException ignored) { + } + } + + @Test public void testIteratorDisallowsConcurrentModificationWithClear() throws IOException { + QueueFile queueFile = newQueueFile(); + for (int i = 0; i < 15; i++) { + queueFile.add(values[i]); + } + + Iterator iterator = queueFile.iterator(); + iterator.next(); + queueFile.clear(); + try { + iterator.hasNext(); + fail(); + } catch (ConcurrentModificationException ignored) { + } + } + + @Test public void testIteratorOnlyRemovesFromHead() throws IOException { + QueueFile queueFile = newQueueFile(); + for (int i = 0; i < 15; i++) { + queueFile.add(values[i]); + } + + Iterator iterator = queueFile.iterator(); + iterator.next(); + iterator.next(); + + try { + iterator.remove(); + fail(); + } catch (UnsupportedOperationException ex) { + assertThat(ex).hasMessageThat().isEqualTo("Removal is only permitted from the head."); + } + } + + @Test public void iteratorThrowsIOException() throws IOException { + QueueFile queueFile = newQueueFile(); + queueFile.add(values[253]); + queueFile.close(); + + final class BrokenRandomAccessFile extends RandomAccessFile { + boolean fail = false; + + BrokenRandomAccessFile(File file, String mode) + throws FileNotFoundException { + super(file, mode); + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + if (fail) { + throw new IOException(); + } + super.write(b, off, len); + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + if (fail) { + throw new IOException(); + } + return super.read(b, off, len); + } + } + BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd"); + queueFile = newQueueFile(braf); + Iterator iterator = queueFile.iterator(); + + braf.fail = true; + try { + iterator.next(); + fail(); + } catch (Exception ioe) { + assertThat(ioe).isInstanceOf(IOException.class); + } + + braf.fail = false; + iterator.next(); + + braf.fail = true; + try { + iterator.remove(); + fail(); + } catch (Exception ioe) { + assertThat(ioe).isInstanceOf(IOException.class); + } + } + + @Test public void queueToString() throws IOException { + QueueFile queueFile = newQueueFile(); + for (int i = 0; i < 15; i++) { + queueFile.add(values[i]); + } + + if (forceLegacy) { + assertThat(queueFile.toString()).contains("zero=true, versioned=false, length=4096," + + " size=15," + + " first=Element[position=16, length=0], last=Element[position=163, length=14]}"); + } else { + assertThat(queueFile.toString()).contains("zero=true, versioned=true, length=4096," + + " size=15," + + " first=Element[position=32, length=0], last=Element[position=179, length=14]}"); + } + } + + /** + * A RandomAccessFile that can break when you go to write the COMMITTED + * status. + */ + static final class BrokenRandomAccessFile extends RandomAccessFile { + boolean rejectCommit = true; + + BrokenRandomAccessFile(File file, String mode) + throws FileNotFoundException { + super(file, mode); + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + if (rejectCommit && getFilePointer() == 0) { + throw new IOException("No commit for you!"); + } + super.write(b, off, len); + } + } +} diff --git a/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueTestUtils.java b/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueTestUtils.java new file mode 100644 index 0000000..ab7062e --- /dev/null +++ b/datastructures-queue-tape/src/test/java/com/squareup/tape2/QueueTestUtils.java @@ -0,0 +1,46 @@ +// Copyright 2012 Square, Inc. +package com.squareup.tape2; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import okio.BufferedSink; +import okio.Okio; + +import static org.junit.Assert.assertTrue; + +final class QueueTestUtils { + static final String TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE = + "/truncated-one-entry-serialized-queue"; + static final String TRUNCATED_EMPTY_SERIALIZED_QUEUE = "/truncated-empty-serialized-queue"; + static final String ONE_ENTRY_SERIALIZED_QUEUE = "/one-entry-serialized-queue"; + static final String EMPTY_SERIALIZED_QUEUE = "/empty-serialized-queue"; + static final String FRESH_SERIALIZED_QUEUE = "/fresh-serialized-queue"; + + static File copyTestFile(String file) throws IOException { + File newFile = File.createTempFile(file, "test"); + InputStream in = QueueTestUtils.class.getResourceAsStream(file); + try (BufferedSink sink = Okio.buffer(Okio.sink(newFile))) { + sink.writeAll(Okio.source(in)); + } + assertTrue(newFile.exists()); + return newFile; + } + + /** File that suppresses deletion. */ + static class UndeletableFile extends File { + private static final long serialVersionUID = 1L; + + UndeletableFile(String name) { + super(name); + } + + @Override public boolean delete() { + return false; + } + } + + private QueueTestUtils() { + throw new AssertionError("No instances."); + } +} diff --git a/datastructures-queue-tape/src/test/resources/empty-serialized-queue b/datastructures-queue-tape/src/test/resources/empty-serialized-queue new file mode 100644 index 0000000000000000000000000000000000000000..50e0af1bbaa12f27b3d4c5c57aaf00114cabde56 GIT binary patch literal 4096 qcmeIu0Sy2E48x#GxPP_xv48~X!hit-1`HT5V8DO@0|pEj*dBNf?*I`1 literal 0 HcmV?d00001 diff --git a/datastructures-queue-tape/src/test/resources/one-entry-serialized-queue b/datastructures-queue-tape/src/test/resources/one-entry-serialized-queue new file mode 100644 index 0000000000000000000000000000000000000000..b71bdb167b2c9eeb7f61e9036798892aeff2f12e GIT binary patch literal 4096 zcmZQz5MTfTMj!?;z7SRXkeXLgR0&cv3PwX