add queue tape
This commit is contained in:
parent
ace846e5ad
commit
44b7ae7de2
15 changed files with 2248 additions and 0 deletions
14
datastructures-queue-tape/build.gradle
Normal file
14
datastructures-queue-tape/build.gradle
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
dependencies {
|
||||||
|
testCompileOnly 'com.google.code.findbugs:jsr305:3.0.2'
|
||||||
|
testImplementation 'junit:junit:4.13.2'
|
||||||
|
testImplementation 'com.google.truth:truth:0.32'
|
||||||
|
testImplementation 'com.squareup.burst:burst-junit4:1.1.1'
|
||||||
|
testImplementation 'com.squareup.okio:okio:1.13.0'
|
||||||
|
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:5.7.2"
|
||||||
|
}
|
||||||
|
|
||||||
|
test {
|
||||||
|
useJUnitPlatform {
|
||||||
|
includeEngines 'junit-jupiter', 'junit-vintage'
|
||||||
|
}
|
||||||
|
}
|
3
datastructures-queue-tape/src/main/java/module-info.java
Normal file
3
datastructures-queue-tape/src/main/java/module-info.java
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
module org.xbib.datastructures.queue.tape {
|
||||||
|
exports org.xbib.datastructures.queue.tape;
|
||||||
|
}
|
|
@ -0,0 +1,122 @@
|
||||||
|
package org.xbib.datastructures.queue.tape;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
public final class FileObjectQueue<T> extends ObjectQueue<T> {
|
||||||
|
|
||||||
|
private final QueueFile queueFile;
|
||||||
|
|
||||||
|
private final DirectByteArrayOutputStream bytes = new DirectByteArrayOutputStream();
|
||||||
|
|
||||||
|
final Converter<T> converter;
|
||||||
|
|
||||||
|
public FileObjectQueue(QueueFile queueFile, Converter<T> converter) {
|
||||||
|
this.queueFile = queueFile;
|
||||||
|
this.converter = converter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueFile file() {
|
||||||
|
return queueFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return queueFile.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return queueFile.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(T entry) throws IOException {
|
||||||
|
bytes.reset();
|
||||||
|
converter.toStream(entry, bytes);
|
||||||
|
queueFile.add(bytes.getArray(), 0, bytes.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T peek() throws IOException {
|
||||||
|
byte[] bytes = queueFile.peek();
|
||||||
|
if (bytes == null) return null;
|
||||||
|
return converter.from(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() throws IOException {
|
||||||
|
queueFile.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove(int n) throws IOException {
|
||||||
|
queueFile.remove(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() throws IOException {
|
||||||
|
queueFile.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
queueFile.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<T> iterator() {
|
||||||
|
return new QueueFileIterator(queueFile.iterator());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "FileObjectQueue{" + "queueFile=" + queueFile + '}';
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class QueueFileIterator implements Iterator<T> {
|
||||||
|
final Iterator<byte[]> iterator;
|
||||||
|
|
||||||
|
QueueFileIterator(Iterator<byte[]> iterator) {
|
||||||
|
this.iterator = iterator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T next() {
|
||||||
|
byte[] data = iterator.next();
|
||||||
|
try {
|
||||||
|
return converter.from(data);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw QueueFile.<Error>getSneakyThrowable(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enables direct access to the internal array. Avoids unnecessary copying.
|
||||||
|
*/
|
||||||
|
private static final class DirectByteArrayOutputStream extends ByteArrayOutputStream {
|
||||||
|
DirectByteArrayOutputStream() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a reference to the internal byte array. The {@link #size()} method indicates how many
|
||||||
|
* bytes contain actual data added since the last {@link #reset()} call.
|
||||||
|
*/
|
||||||
|
byte[] getArray() {
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,127 @@
|
||||||
|
package org.xbib.datastructures.queue.tape;
|
||||||
|
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.ConcurrentModificationException;
|
||||||
|
import java.util.Deque;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
public final class InMemoryObjectQueue<T> extends ObjectQueue<T> {
|
||||||
|
|
||||||
|
private final Deque<T> entries;
|
||||||
|
int modCount = 0;
|
||||||
|
boolean closed;
|
||||||
|
|
||||||
|
public InMemoryObjectQueue() {
|
||||||
|
entries = new ArrayDeque<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueFile file() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(T entry) {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
modCount++;
|
||||||
|
entries.addLast(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T peek() {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
return entries.peekFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<T> asList() {
|
||||||
|
return Collections.unmodifiableList(new ArrayList<>(entries));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return entries.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
remove(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove(int n) {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
modCount++;
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
entries.removeFirst();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<T> iterator() {
|
||||||
|
return new EntryIterator(entries.iterator());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "InMemoryObjectQueue{"
|
||||||
|
+ "size=" + entries.size()
|
||||||
|
+ '}';
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class EntryIterator implements Iterator<T> {
|
||||||
|
private final Iterator<T> delegate;
|
||||||
|
private int index = 0;
|
||||||
|
|
||||||
|
private int expectedModCount = modCount;
|
||||||
|
|
||||||
|
EntryIterator(Iterator<T> delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
checkForComodification();
|
||||||
|
return delegate.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T next() {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
checkForComodification();
|
||||||
|
|
||||||
|
T next = delegate.next();
|
||||||
|
index += 1;
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
checkForComodification();
|
||||||
|
|
||||||
|
if (size() == 0) throw new NoSuchElementException();
|
||||||
|
if (index != 1) {
|
||||||
|
throw new UnsupportedOperationException("Removal is only permitted from the head.");
|
||||||
|
}
|
||||||
|
|
||||||
|
InMemoryObjectQueue.this.remove();
|
||||||
|
|
||||||
|
expectedModCount = modCount;
|
||||||
|
index -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkForComodification() {
|
||||||
|
if (modCount != expectedModCount) throw new ConcurrentModificationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
package org.xbib.datastructures.queue.tape;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public abstract class ObjectQueue<T> implements Iterable<T>, Closeable {
|
||||||
|
|
||||||
|
public static <T> ObjectQueue<T> create(QueueFile qf, Converter<T> converter) {
|
||||||
|
return new FileObjectQueue<>(qf, converter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> ObjectQueue<T> createInMemory() {
|
||||||
|
return new InMemoryObjectQueue<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract QueueFile file();
|
||||||
|
|
||||||
|
public abstract int size();
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return size() == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void add(T entry) throws IOException;
|
||||||
|
|
||||||
|
public abstract T peek() throws IOException;
|
||||||
|
|
||||||
|
public List<T> peek(int max) {
|
||||||
|
int end = Math.min(max, size());
|
||||||
|
List<T> subList = new ArrayList<>(end);
|
||||||
|
Iterator<T> iterator = iterator();
|
||||||
|
for (int i = 0; i < end; i++) {
|
||||||
|
subList.add(iterator.next());
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableList(subList);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<T> asList() {
|
||||||
|
return peek(size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove() throws IOException {
|
||||||
|
remove(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void remove(int n) throws IOException;
|
||||||
|
|
||||||
|
public void clear() throws IOException {
|
||||||
|
remove(size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface Converter<T> {
|
||||||
|
|
||||||
|
T from(byte[] source) throws IOException;
|
||||||
|
|
||||||
|
void toStream(T value, OutputStream sink) throws IOException;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,544 @@
|
||||||
|
package org.xbib.datastructures.queue.tape;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.util.ConcurrentModificationException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
import static java.lang.Math.min;
|
||||||
|
|
||||||
|
public final class QueueFile implements Closeable, Iterable<byte[]> {
|
||||||
|
|
||||||
|
private static final int VERSIONED_HEADER = 0x80000001;
|
||||||
|
|
||||||
|
public static final int INITIAL_LENGTH = 4096; // one file system block
|
||||||
|
|
||||||
|
private static final byte[] ZEROES = new byte[INITIAL_LENGTH];
|
||||||
|
|
||||||
|
public final RandomAccessFile raf;
|
||||||
|
|
||||||
|
final File file;
|
||||||
|
|
||||||
|
final boolean versioned;
|
||||||
|
|
||||||
|
final int headerLength;
|
||||||
|
|
||||||
|
public long fileLength;
|
||||||
|
|
||||||
|
int elementCount;
|
||||||
|
|
||||||
|
Element first;
|
||||||
|
|
||||||
|
private Element last;
|
||||||
|
|
||||||
|
private final byte[] buffer = new byte[32];
|
||||||
|
|
||||||
|
int modCount = 0;
|
||||||
|
|
||||||
|
private final boolean zero;
|
||||||
|
|
||||||
|
boolean closed;
|
||||||
|
|
||||||
|
static RandomAccessFile initializeFromFile(File file, boolean forceLegacy) throws IOException {
|
||||||
|
if (!file.exists()) {
|
||||||
|
File tempFile = new File(file.getPath() + ".tmp");
|
||||||
|
try (RandomAccessFile raf = open(tempFile)) {
|
||||||
|
raf.setLength(INITIAL_LENGTH);
|
||||||
|
raf.seek(0);
|
||||||
|
if (forceLegacy) {
|
||||||
|
raf.writeInt(INITIAL_LENGTH);
|
||||||
|
} else {
|
||||||
|
raf.writeInt(VERSIONED_HEADER);
|
||||||
|
raf.writeLong(INITIAL_LENGTH);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!tempFile.renameTo(file)) {
|
||||||
|
throw new IOException("Rename failed!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return open(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RandomAccessFile open(File file) throws FileNotFoundException {
|
||||||
|
return new RandomAccessFile(file, "rwd");
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueFile(File file, RandomAccessFile raf, boolean zero, boolean forceLegacy) throws IOException {
|
||||||
|
this.file = file;
|
||||||
|
this.raf = raf;
|
||||||
|
this.zero = zero;
|
||||||
|
raf.seek(0);
|
||||||
|
raf.readFully(buffer);
|
||||||
|
versioned = !forceLegacy && (buffer[0] & 0x80) != 0;
|
||||||
|
long firstOffset;
|
||||||
|
long lastOffset;
|
||||||
|
if (versioned) {
|
||||||
|
headerLength = 32;
|
||||||
|
int version = readInt(buffer, 0) & 0x7FFFFFFF;
|
||||||
|
if (version != 1) {
|
||||||
|
throw new IOException("Unable to read version " + version + " format. Supported versions are 1 and legacy.");
|
||||||
|
}
|
||||||
|
fileLength = readLong(buffer, 4);
|
||||||
|
elementCount = readInt(buffer, 12);
|
||||||
|
firstOffset = readLong(buffer, 16);
|
||||||
|
lastOffset = readLong(buffer, 24);
|
||||||
|
} else {
|
||||||
|
headerLength = 16;
|
||||||
|
fileLength = readInt(buffer, 0);
|
||||||
|
elementCount = readInt(buffer, 4);
|
||||||
|
firstOffset = readInt(buffer, 8);
|
||||||
|
lastOffset = readInt(buffer, 12);
|
||||||
|
}
|
||||||
|
if (fileLength > raf.length()) {
|
||||||
|
throw new IOException("File is truncated. Expected length: " + fileLength + ", Actual length: " + raf.length());
|
||||||
|
} else if (fileLength <= headerLength) {
|
||||||
|
throw new IOException("File is corrupt; length stored in header (" + fileLength + ") is invalid.");
|
||||||
|
}
|
||||||
|
first = readElement(firstOffset);
|
||||||
|
last = readElement(lastOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeInt(byte[] buffer, int offset, int value) {
|
||||||
|
buffer[offset] = (byte) (value >> 24);
|
||||||
|
buffer[offset + 1] = (byte) (value >> 16);
|
||||||
|
buffer[offset + 2] = (byte) (value >> 8);
|
||||||
|
buffer[offset + 3] = (byte) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int readInt(byte[] buffer, int offset) {
|
||||||
|
return ((buffer[offset] & 0xff) << 24)
|
||||||
|
+ ((buffer[offset + 1] & 0xff) << 16)
|
||||||
|
+ ((buffer[offset + 2] & 0xff) << 8)
|
||||||
|
+ (buffer[offset + 3] & 0xff);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeLong(byte[] buffer, int offset, long value) {
|
||||||
|
buffer[offset] = (byte) (value >> 56);
|
||||||
|
buffer[offset + 1] = (byte) (value >> 48);
|
||||||
|
buffer[offset + 2] = (byte) (value >> 40);
|
||||||
|
buffer[offset + 3] = (byte) (value >> 32);
|
||||||
|
buffer[offset + 4] = (byte) (value >> 24);
|
||||||
|
buffer[offset + 5] = (byte) (value >> 16);
|
||||||
|
buffer[offset + 6] = (byte) (value >> 8);
|
||||||
|
buffer[offset + 7] = (byte) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long readLong(byte[] buffer, int offset) {
|
||||||
|
return ((buffer[offset] & 0xffL) << 56)
|
||||||
|
+ ((buffer[offset + 1] & 0xffL) << 48)
|
||||||
|
+ ((buffer[offset + 2] & 0xffL) << 40)
|
||||||
|
+ ((buffer[offset + 3] & 0xffL) << 32)
|
||||||
|
+ ((buffer[offset + 4] & 0xffL) << 24)
|
||||||
|
+ ((buffer[offset + 5] & 0xffL) << 16)
|
||||||
|
+ ((buffer[offset + 6] & 0xffL) << 8)
|
||||||
|
+ (buffer[offset + 7] & 0xffL);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeHeader(long fileLength, int elementCount, long firstPosition, long lastPosition)
|
||||||
|
throws IOException {
|
||||||
|
raf.seek(0);
|
||||||
|
|
||||||
|
if (versioned) {
|
||||||
|
writeInt(buffer, 0, VERSIONED_HEADER);
|
||||||
|
writeLong(buffer, 4, fileLength);
|
||||||
|
writeInt(buffer, 12, elementCount);
|
||||||
|
writeLong(buffer, 16, firstPosition);
|
||||||
|
writeLong(buffer, 24, lastPosition);
|
||||||
|
raf.write(buffer, 0, 32);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Legacy queue header.
|
||||||
|
writeInt(buffer, 0, (int) fileLength); // Signed, so leading bit is always 0 aka legacy.
|
||||||
|
writeInt(buffer, 4, elementCount);
|
||||||
|
writeInt(buffer, 8, (int) firstPosition);
|
||||||
|
writeInt(buffer, 12, (int) lastPosition);
|
||||||
|
raf.write(buffer, 0, 16);
|
||||||
|
}
|
||||||
|
|
||||||
|
Element readElement(long position) throws IOException {
|
||||||
|
if (position == 0) return Element.NULL;
|
||||||
|
ringRead(position, buffer, 0, Element.HEADER_LENGTH);
|
||||||
|
int length = readInt(buffer, 0);
|
||||||
|
return new Element(position, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
long wrapPosition(long position) {
|
||||||
|
return position < fileLength ? position
|
||||||
|
: headerLength + position - fileLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ringWrite(long position, byte[] buffer, int offset, int count) throws IOException {
|
||||||
|
position = wrapPosition(position);
|
||||||
|
if (position + count <= fileLength) {
|
||||||
|
raf.seek(position);
|
||||||
|
raf.write(buffer, offset, count);
|
||||||
|
} else {
|
||||||
|
int beforeEof = (int) (fileLength - position);
|
||||||
|
raf.seek(position);
|
||||||
|
raf.write(buffer, offset, beforeEof);
|
||||||
|
raf.seek(headerLength);
|
||||||
|
raf.write(buffer, offset + beforeEof, count - beforeEof);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ringErase(long position, long length) throws IOException {
|
||||||
|
while (length > 0) {
|
||||||
|
int chunk = (int) min(length, ZEROES.length);
|
||||||
|
ringWrite(position, ZEROES, 0, chunk);
|
||||||
|
length -= chunk;
|
||||||
|
position += chunk;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ringRead(long position, byte[] buffer, int offset, int count) throws IOException {
|
||||||
|
position = wrapPosition(position);
|
||||||
|
if (position + count <= fileLength) {
|
||||||
|
raf.seek(position);
|
||||||
|
raf.readFully(buffer, offset, count);
|
||||||
|
} else {
|
||||||
|
int beforeEof = (int) (fileLength - position);
|
||||||
|
raf.seek(position);
|
||||||
|
raf.readFully(buffer, offset, beforeEof);
|
||||||
|
raf.seek(headerLength);
|
||||||
|
raf.readFully(buffer, offset + beforeEof, count - beforeEof);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(byte[] data) throws IOException {
|
||||||
|
add(data, 0, data.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(byte[] data, int offset, int count) throws IOException {
|
||||||
|
if (data == null) {
|
||||||
|
throw new NullPointerException("data == null");
|
||||||
|
}
|
||||||
|
if ((offset | count) < 0 || count > data.length - offset) {
|
||||||
|
throw new IndexOutOfBoundsException();
|
||||||
|
}
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
expandIfNecessary(count);
|
||||||
|
boolean wasEmpty = isEmpty();
|
||||||
|
long position = wasEmpty ? headerLength : wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
|
||||||
|
Element newLast = new Element(position, count);
|
||||||
|
writeInt(buffer, 0, count);
|
||||||
|
ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);
|
||||||
|
ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);
|
||||||
|
long firstPosition = wasEmpty ? newLast.position : first.position;
|
||||||
|
writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);
|
||||||
|
last = newLast;
|
||||||
|
elementCount++;
|
||||||
|
modCount++;
|
||||||
|
if (wasEmpty) first = last;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long usedBytes() {
|
||||||
|
if (elementCount == 0) return headerLength;
|
||||||
|
if (last.position >= first.position) {
|
||||||
|
return (last.position - first.position)
|
||||||
|
+ Element.HEADER_LENGTH + last.length
|
||||||
|
+ headerLength;
|
||||||
|
} else {
|
||||||
|
return last.position
|
||||||
|
+ Element.HEADER_LENGTH + last.length
|
||||||
|
+ fileLength - first.position;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long remainingBytes() {
|
||||||
|
return fileLength - usedBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return elementCount == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If necessary, expands the file to accommodate an additional element of the given length.
|
||||||
|
*
|
||||||
|
* @param dataLength length of data being added
|
||||||
|
*/
|
||||||
|
private void expandIfNecessary(long dataLength) throws IOException {
|
||||||
|
long elementLength = Element.HEADER_LENGTH + dataLength;
|
||||||
|
long remainingBytes = remainingBytes();
|
||||||
|
if (remainingBytes >= elementLength) return;
|
||||||
|
|
||||||
|
// Expand.
|
||||||
|
long previousLength = fileLength;
|
||||||
|
long newLength;
|
||||||
|
// Double the length until we can fit the new data.
|
||||||
|
do {
|
||||||
|
remainingBytes += previousLength;
|
||||||
|
newLength = previousLength << 1;
|
||||||
|
previousLength = newLength;
|
||||||
|
} while (remainingBytes < elementLength);
|
||||||
|
|
||||||
|
setLength(newLength);
|
||||||
|
|
||||||
|
// Calculate the position of the tail end of the data in the ring buffer
|
||||||
|
long endOfLastElement = wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
|
||||||
|
long count = 0;
|
||||||
|
// If the buffer is split, we need to make it contiguous
|
||||||
|
if (endOfLastElement <= first.position) {
|
||||||
|
FileChannel channel = raf.getChannel();
|
||||||
|
channel.position(fileLength); // destination position
|
||||||
|
count = endOfLastElement - headerLength;
|
||||||
|
if (channel.transferTo(headerLength, count, channel) != count) {
|
||||||
|
throw new AssertionError("Copied insufficient number of bytes!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the expansion.
|
||||||
|
if (last.position < first.position) {
|
||||||
|
long newLastPosition = fileLength + last.position - headerLength;
|
||||||
|
writeHeader(newLength, elementCount, first.position, newLastPosition);
|
||||||
|
last = new Element(newLastPosition, last.length);
|
||||||
|
} else {
|
||||||
|
writeHeader(newLength, elementCount, first.position, last.position);
|
||||||
|
}
|
||||||
|
|
||||||
|
fileLength = newLength;
|
||||||
|
|
||||||
|
if (zero) {
|
||||||
|
ringErase(headerLength, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the length of the file.
|
||||||
|
*/
|
||||||
|
private void setLength(long newLength) throws IOException {
|
||||||
|
// Set new file length (considered metadata) and sync it to storage.
|
||||||
|
raf.setLength(newLength);
|
||||||
|
raf.getChannel().force(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the eldest element. Returns null if the queue is empty.
|
||||||
|
*/
|
||||||
|
public byte[] peek() throws IOException {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
if (isEmpty()) return null;
|
||||||
|
int length = first.length;
|
||||||
|
byte[] data = new byte[length];
|
||||||
|
ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an iterator over elements in this QueueFile.
|
||||||
|
*
|
||||||
|
* <p>The iterator disallows modifications to be made to the QueueFile during iteration. Removing
|
||||||
|
* elements from the head of the QueueFile is permitted during iteration using
|
||||||
|
* {@link Iterator#remove()}.
|
||||||
|
*
|
||||||
|
* <p>The iterator may throw an unchecked {@link IOException} during {@link Iterator#next()}
|
||||||
|
* or {@link Iterator#remove()}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Iterator<byte[]> iterator() {
|
||||||
|
return new ElementIterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class ElementIterator implements Iterator<byte[]> {
|
||||||
|
int nextElementIndex = 0;
|
||||||
|
private long nextElementPosition = first.position;
|
||||||
|
int expectedModCount = modCount;
|
||||||
|
|
||||||
|
ElementIterator() {
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkForComodification() {
|
||||||
|
if (modCount != expectedModCount) throw new ConcurrentModificationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
checkForComodification();
|
||||||
|
return nextElementIndex != elementCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] next() {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
checkForComodification();
|
||||||
|
if (isEmpty()) throw new NoSuchElementException();
|
||||||
|
if (nextElementIndex >= elementCount) throw new NoSuchElementException();
|
||||||
|
try {
|
||||||
|
Element current = readElement(nextElementPosition);
|
||||||
|
byte[] buffer = new byte[current.length];
|
||||||
|
nextElementPosition = wrapPosition(current.position + Element.HEADER_LENGTH);
|
||||||
|
ringRead(nextElementPosition, buffer, 0, current.length);
|
||||||
|
nextElementPosition = wrapPosition(current.position + Element.HEADER_LENGTH + current.length);
|
||||||
|
nextElementIndex++;
|
||||||
|
return buffer;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw QueueFile.<Error>getSneakyThrowable(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
checkForComodification();
|
||||||
|
if (isEmpty()) throw new NoSuchElementException();
|
||||||
|
if (nextElementIndex != 1) {
|
||||||
|
throw new UnsupportedOperationException("Removal is only permitted from the head.");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
QueueFile.this.remove();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw QueueFile.<Error>getSneakyThrowable(e);
|
||||||
|
}
|
||||||
|
expectedModCount = modCount;
|
||||||
|
nextElementIndex--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
return elementCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove() throws IOException {
|
||||||
|
remove(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove(int n) throws IOException {
|
||||||
|
if (n < 0) {
|
||||||
|
throw new IllegalArgumentException("Cannot remove negative (" + n + ") number of elements.");
|
||||||
|
}
|
||||||
|
if (n == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (n == elementCount) {
|
||||||
|
clear();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (isEmpty()) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
if (n > elementCount) {
|
||||||
|
throw new IllegalArgumentException("Cannot remove more elements (" + n + ") than present in queue (" + elementCount + ").");
|
||||||
|
}
|
||||||
|
long eraseStartPosition = first.position;
|
||||||
|
long eraseTotalLength = 0;
|
||||||
|
long newFirstPosition = first.position;
|
||||||
|
int newFirstLength = first.length;
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
eraseTotalLength += Element.HEADER_LENGTH + newFirstLength;
|
||||||
|
newFirstPosition = wrapPosition(newFirstPosition + Element.HEADER_LENGTH + newFirstLength);
|
||||||
|
ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
|
||||||
|
newFirstLength = readInt(buffer, 0);
|
||||||
|
}
|
||||||
|
writeHeader(fileLength, elementCount - n, newFirstPosition, last.position);
|
||||||
|
elementCount -= n;
|
||||||
|
modCount++;
|
||||||
|
first = new Element(newFirstPosition, newFirstLength);
|
||||||
|
if (zero) {
|
||||||
|
ringErase(eraseStartPosition, eraseTotalLength);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clear() throws IOException {
|
||||||
|
if (closed) throw new IllegalStateException("closed");
|
||||||
|
writeHeader(INITIAL_LENGTH, 0, 0, 0);
|
||||||
|
if (zero) {
|
||||||
|
raf.seek(headerLength);
|
||||||
|
raf.write(ZEROES, 0, INITIAL_LENGTH - headerLength);
|
||||||
|
}
|
||||||
|
elementCount = 0;
|
||||||
|
first = Element.NULL;
|
||||||
|
last = Element.NULL;
|
||||||
|
if (fileLength > INITIAL_LENGTH) setLength(INITIAL_LENGTH);
|
||||||
|
fileLength = INITIAL_LENGTH;
|
||||||
|
modCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
public File file() {
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
closed = true;
|
||||||
|
raf.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "QueueFile{"
|
||||||
|
+ "file=" + file
|
||||||
|
+ ", zero=" + zero
|
||||||
|
+ ", versioned=" + versioned
|
||||||
|
+ ", length=" + fileLength
|
||||||
|
+ ", size=" + elementCount
|
||||||
|
+ ", first=" + first
|
||||||
|
+ ", last=" + last
|
||||||
|
+ '}';
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Element {
|
||||||
|
static final Element NULL = new Element(0, 0);
|
||||||
|
public static final int HEADER_LENGTH = 4;
|
||||||
|
final long position;
|
||||||
|
final int length;
|
||||||
|
public Element(long position, int length) {
|
||||||
|
this.position = position;
|
||||||
|
this.length = length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getClass().getSimpleName()
|
||||||
|
+ "[position=" + position
|
||||||
|
+ ", length=" + length
|
||||||
|
+ "]";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class Builder {
|
||||||
|
final File file;
|
||||||
|
boolean zero = true;
|
||||||
|
boolean forceLegacy = false;
|
||||||
|
|
||||||
|
public Builder(File file) {
|
||||||
|
if (file == null) {
|
||||||
|
throw new NullPointerException("file == null");
|
||||||
|
}
|
||||||
|
this.file = file;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder zero(boolean zero) {
|
||||||
|
this.zero = zero;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder forceLegacy(boolean forceLegacy) {
|
||||||
|
this.forceLegacy = forceLegacy;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueFile build() throws IOException {
|
||||||
|
RandomAccessFile raf = initializeFromFile(file, forceLegacy);
|
||||||
|
QueueFile qf = null;
|
||||||
|
try {
|
||||||
|
qf = new QueueFile(file, raf, zero, forceLegacy);
|
||||||
|
return qf;
|
||||||
|
} finally {
|
||||||
|
if (qf == null) {
|
||||||
|
raf.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({"unchecked", "TypeParameterUnusedInFormals"})
|
||||||
|
static <T extends Throwable> T getSneakyThrowable(Throwable t) throws T {
|
||||||
|
throw (T) t;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,231 @@
|
||||||
|
package com.squareup.tape2;
|
||||||
|
|
||||||
|
import com.squareup.burst.BurstJUnit4;
|
||||||
|
import com.squareup.burst.annotation.Burst;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.ConcurrentModificationException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.xbib.datastructures.queue.tape.FileObjectQueue;
|
||||||
|
import org.xbib.datastructures.queue.tape.ObjectQueue;
|
||||||
|
import org.xbib.datastructures.queue.tape.QueueFile;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@RunWith(BurstJUnit4.class)
|
||||||
|
public class ObjectQueueTest {
|
||||||
|
public enum QueueFactory {
|
||||||
|
FILE() {
|
||||||
|
@Override
|
||||||
|
public <T> ObjectQueue<T> create(QueueFile queueFile, FileObjectQueue.Converter<T> converter)
|
||||||
|
throws IOException {
|
||||||
|
return ObjectQueue.create(queueFile, converter);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
MEMORY() {
|
||||||
|
@Override
|
||||||
|
public <T> ObjectQueue<T> create(QueueFile file, FileObjectQueue.Converter<T> converter) {
|
||||||
|
return ObjectQueue.createInMemory();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public abstract <T> ObjectQueue<T> create(QueueFile queueFile,
|
||||||
|
FileObjectQueue.Converter<T> converter) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Rule public TemporaryFolder folder = new TemporaryFolder();
|
||||||
|
@Burst QueueFactory factory;
|
||||||
|
ObjectQueue<String> queue;
|
||||||
|
|
||||||
|
@Before public void setUp() throws IOException {
|
||||||
|
File parent = folder.getRoot();
|
||||||
|
File file = new File(parent, "object-queue");
|
||||||
|
QueueFile queueFile = new QueueFile.Builder(file).build();
|
||||||
|
|
||||||
|
queue = factory.create(queueFile, new StringConverter());
|
||||||
|
queue.add("one");
|
||||||
|
queue.add("two");
|
||||||
|
queue.add("three");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void size() throws IOException {
|
||||||
|
assertThat(queue.size()).isEqualTo(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void peek() throws IOException {
|
||||||
|
assertThat(queue.peek()).isEqualTo("one");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void peekMultiple() throws IOException {
|
||||||
|
assertThat(queue.peek(2)).containsExactly("one", "two");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void peekMaxCanExceedQueueDepth() throws IOException {
|
||||||
|
assertThat(queue.peek(6)).containsExactly("one", "two", "three");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void asList() throws IOException {
|
||||||
|
assertThat(queue.asList()).containsExactly("one", "two", "three");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void remove() throws IOException {
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
assertThat(queue.asList()).containsExactly("two", "three");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeMultiple() throws IOException {
|
||||||
|
queue.remove(2);
|
||||||
|
|
||||||
|
assertThat(queue.asList()).containsExactly("three");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void clear() throws IOException {
|
||||||
|
queue.clear();
|
||||||
|
|
||||||
|
assertThat(queue.size()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void isEmpty() throws IOException {
|
||||||
|
assertThat(queue.isEmpty()).isFalse();
|
||||||
|
|
||||||
|
queue.clear();
|
||||||
|
|
||||||
|
assertThat(queue.isEmpty()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIterator() throws IOException {
|
||||||
|
final List<String> saw = new ArrayList<>();
|
||||||
|
for (String pojo : queue) {
|
||||||
|
saw.add(pojo);
|
||||||
|
}
|
||||||
|
assertThat(saw).containsExactly("one", "two", "three");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorNextThrowsWhenEmpty() throws IOException {
|
||||||
|
queue.clear();
|
||||||
|
Iterator<String> iterator = queue.iterator();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.next();
|
||||||
|
fail();
|
||||||
|
} catch (NoSuchElementException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorNextThrowsWhenExhausted() throws IOException {
|
||||||
|
Iterator<String> iterator = queue.iterator();
|
||||||
|
iterator.next();
|
||||||
|
iterator.next();
|
||||||
|
iterator.next();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.next();
|
||||||
|
fail();
|
||||||
|
} catch (NoSuchElementException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorRemove() throws IOException {
|
||||||
|
Iterator<String> iterator = queue.iterator();
|
||||||
|
|
||||||
|
iterator.next();
|
||||||
|
iterator.remove();
|
||||||
|
assertThat(queue.asList()).containsExactly("two", "three");
|
||||||
|
|
||||||
|
iterator.next();
|
||||||
|
iterator.remove();
|
||||||
|
assertThat(queue.asList()).containsExactly("three");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorRemoveDisallowsConcurrentModification() throws IOException {
|
||||||
|
Iterator<String> iterator = queue.iterator();
|
||||||
|
iterator.next();
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.remove();
|
||||||
|
fail();
|
||||||
|
} catch (ConcurrentModificationException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorHasNextDisallowsConcurrentModification() throws IOException {
|
||||||
|
Iterator<String> iterator = queue.iterator();
|
||||||
|
iterator.next();
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.hasNext();
|
||||||
|
fail();
|
||||||
|
} catch (ConcurrentModificationException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorDisallowsConcurrentModificationWithClear() throws IOException {
|
||||||
|
Iterator<String> iterator = queue.iterator();
|
||||||
|
iterator.next();
|
||||||
|
queue.clear();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.hasNext();
|
||||||
|
fail();
|
||||||
|
} catch (ConcurrentModificationException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorOnlyRemovesFromHead() throws IOException {
|
||||||
|
Iterator<String> iterator = queue.iterator();
|
||||||
|
iterator.next();
|
||||||
|
iterator.next();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.remove();
|
||||||
|
fail();
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
assertThat(ex).hasMessageThat().isEqualTo("Removal is only permitted from the head.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void iteratorThrowsIOException() throws IOException {
|
||||||
|
File parent = folder.getRoot();
|
||||||
|
File file = new File(parent, "object-queue");
|
||||||
|
QueueFile queueFile = new QueueFile.Builder(file).build();
|
||||||
|
ObjectQueue<Object> queue = ObjectQueue.create(queueFile, new ObjectQueue.Converter<Object>() {
|
||||||
|
@Override public String from(byte[] bytes) throws IOException {
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void toStream(Object o, OutputStream bytes) {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
queue.add(new Object());
|
||||||
|
Iterator<Object> iterator = queue.iterator();
|
||||||
|
try {
|
||||||
|
iterator.next();
|
||||||
|
fail();
|
||||||
|
} catch (Exception ioe) {
|
||||||
|
assertThat(ioe).isInstanceOf(IOException.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class StringConverter implements FileObjectQueue.Converter<String> {
|
||||||
|
@Override public String from(byte[] bytes) throws IOException {
|
||||||
|
return new String(bytes, "UTF-8");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void toStream(String s, OutputStream os) throws IOException {
|
||||||
|
os.write(s.getBytes("UTF-8"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,103 @@
|
||||||
|
package com.squareup.tape2;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.JUnit4;
|
||||||
|
import org.xbib.datastructures.queue.tape.FileObjectQueue;
|
||||||
|
import org.xbib.datastructures.queue.tape.QueueFile;
|
||||||
|
|
||||||
|
import static com.squareup.tape2.QueueTestUtils.EMPTY_SERIALIZED_QUEUE;
|
||||||
|
import static com.squareup.tape2.QueueTestUtils.FRESH_SERIALIZED_QUEUE;
|
||||||
|
import static com.squareup.tape2.QueueTestUtils.ONE_ENTRY_SERIALIZED_QUEUE;
|
||||||
|
import static com.squareup.tape2.QueueTestUtils.TRUNCATED_EMPTY_SERIALIZED_QUEUE;
|
||||||
|
import static com.squareup.tape2.QueueTestUtils.TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE;
|
||||||
|
import static com.squareup.tape2.QueueTestUtils.copyTestFile;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@RunWith(JUnit4.class)
|
||||||
|
public final class QueueFileLoadingTest {
|
||||||
|
|
||||||
|
private File testFile;
|
||||||
|
|
||||||
|
@After public void cleanup() {
|
||||||
|
assertTrue("Failed to delete test file " + testFile.getPath(), testFile.delete());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testMissingFileInitializes() throws Exception {
|
||||||
|
testFile = File.createTempFile(FRESH_SERIALIZED_QUEUE, "test");
|
||||||
|
assertTrue(testFile.delete());
|
||||||
|
assertFalse(testFile.exists());
|
||||||
|
QueueFile queue = new QueueFile.Builder(testFile).build();
|
||||||
|
assertEquals(0, queue.size());
|
||||||
|
assertTrue(testFile.exists());
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testEmptyFileInitializes() throws Exception {
|
||||||
|
testFile = copyTestFile(EMPTY_SERIALIZED_QUEUE);
|
||||||
|
QueueFile queue = new QueueFile.Builder(testFile).build();
|
||||||
|
assertEquals(0, queue.size());
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSingleEntryFileInitializes() throws Exception {
|
||||||
|
testFile = copyTestFile(ONE_ENTRY_SERIALIZED_QUEUE);
|
||||||
|
QueueFile queue = new QueueFile.Builder(testFile).build();
|
||||||
|
assertEquals(1, queue.size());
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testTruncatedEmptyFileThrows() throws Exception {
|
||||||
|
testFile = copyTestFile(TRUNCATED_EMPTY_SERIALIZED_QUEUE);
|
||||||
|
new QueueFile.Builder(testFile).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testTruncatedOneEntryFileThrows() throws Exception {
|
||||||
|
testFile = copyTestFile(TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE);
|
||||||
|
new QueueFile.Builder(testFile).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testCreateWithReadOnlyFileThrowsException() throws Exception {
|
||||||
|
testFile = copyTestFile(TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE);
|
||||||
|
assertTrue(testFile.setWritable(false));
|
||||||
|
|
||||||
|
// Should throw an exception.
|
||||||
|
new QueueFile.Builder(testFile).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testAddWithReadOnlyFileMissesMonitor() throws Exception {
|
||||||
|
testFile = copyTestFile(EMPTY_SERIALIZED_QUEUE);
|
||||||
|
|
||||||
|
QueueFile qf = new QueueFile.Builder(testFile).build();
|
||||||
|
|
||||||
|
// Should throw an exception.
|
||||||
|
FileObjectQueue<String> queue =
|
||||||
|
new FileObjectQueue<>(qf, new FileObjectQueue.Converter<String>() {
|
||||||
|
@Override public String from(byte[] bytes) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void toStream(String o, OutputStream bytes)
|
||||||
|
throws IOException {
|
||||||
|
throw new IOException("fake Permission denied");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Should throw an exception.
|
||||||
|
try {
|
||||||
|
queue.add("trouble");
|
||||||
|
} finally {
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,994 @@
|
||||||
|
package com.squareup.tape2;
|
||||||
|
|
||||||
|
import org.xbib.datastructures.queue.tape.QueueFile;
|
||||||
|
import org.xbib.datastructures.queue.tape.QueueFile.Element;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.ConcurrentModificationException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
import okio.BufferedSource;
|
||||||
|
import okio.Okio;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static org.xbib.datastructures.queue.tape.QueueFile.INITIAL_LENGTH;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for QueueFile.
|
||||||
|
*
|
||||||
|
* @author Bob Lee (bob@squareup.com)
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class QueueFileTest {
|
||||||
|
@Parameterized.Parameters(name = "{0}")
|
||||||
|
public static List<Object[]> parameters() {
|
||||||
|
return Arrays.asList(
|
||||||
|
new Object[] {"Legacy" , true , 16},
|
||||||
|
new Object[] {"Versioned", false, 32}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(QueueFileTest.class.getName());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes up 33401 bytes in the queue (N*(N+1)/2+4*N). Picked 254 instead of 255 so that the number
|
||||||
|
* of bytes isn't a multiple of 4.
|
||||||
|
*/
|
||||||
|
private static final int N = 254;
|
||||||
|
private static byte[][] values = new byte[N][];
|
||||||
|
|
||||||
|
static {
|
||||||
|
for (int i = 0; i < N; i++) {
|
||||||
|
byte[] value = new byte[i];
|
||||||
|
// Example: values[3] = { 3, 2, 1 }
|
||||||
|
for (int ii = 0; ii < i; ii++)
|
||||||
|
value[ii] = (byte) (i - ii);
|
||||||
|
values[i] = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final boolean forceLegacy;
|
||||||
|
private final int headerLength;
|
||||||
|
|
||||||
|
@Rule public TemporaryFolder folder = new TemporaryFolder();
|
||||||
|
private File file;
|
||||||
|
|
||||||
|
public QueueFileTest(String name, boolean forceLegacy, int headerLength) {
|
||||||
|
this.forceLegacy = forceLegacy;
|
||||||
|
this.headerLength = headerLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueFile newQueueFile() throws IOException {
|
||||||
|
return newQueueFile(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueFile newQueueFile(RandomAccessFile raf) throws IOException {
|
||||||
|
return new QueueFile(this.file, raf, true, forceLegacy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueFile newQueueFile(boolean zero) throws IOException {
|
||||||
|
return new QueueFile.Builder(file).zero(zero).forceLegacy(forceLegacy).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before public void setUp() throws Exception {
|
||||||
|
File parent = folder.getRoot();
|
||||||
|
file = new File(parent, "queue-file");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testAddOneElement() throws IOException {
|
||||||
|
// This test ensures that we update 'first' correctly.
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
byte[] expected = values[253];
|
||||||
|
queue.add(expected);
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected);
|
||||||
|
queue.close();
|
||||||
|
queue = newQueueFile();
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testClearErases() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
byte[] expected = values[253];
|
||||||
|
queue.add(expected);
|
||||||
|
|
||||||
|
// Confirm that the data was in the file before we cleared.
|
||||||
|
byte[] data = new byte[expected.length];
|
||||||
|
queue.raf.seek(headerLength + Element.HEADER_LENGTH);
|
||||||
|
queue.raf.readFully(data, 0, expected.length);
|
||||||
|
assertThat(data).isEqualTo(expected);
|
||||||
|
|
||||||
|
queue.clear();
|
||||||
|
|
||||||
|
// Should have been erased.
|
||||||
|
queue.raf.seek(headerLength + Element.HEADER_LENGTH);
|
||||||
|
queue.raf.readFully(data, 0, expected.length);
|
||||||
|
assertThat(data).isEqualTo(new byte[expected.length]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testClearDoesNotCorrupt() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
byte[] stuff = values[253];
|
||||||
|
queue.add(stuff);
|
||||||
|
queue.clear();
|
||||||
|
|
||||||
|
queue = newQueueFile();
|
||||||
|
assertThat(queue.isEmpty()).isTrue();
|
||||||
|
assertThat(queue.peek()).isNull();
|
||||||
|
|
||||||
|
queue.add(values[25]);
|
||||||
|
assertThat(queue.peek()).isEqualTo(values[25]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeErasesEagerly() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
byte[] firstStuff = values[127];
|
||||||
|
queue.add(firstStuff);
|
||||||
|
|
||||||
|
byte[] secondStuff = values[253];
|
||||||
|
queue.add(secondStuff);
|
||||||
|
|
||||||
|
// Confirm that first stuff was in the file before we remove.
|
||||||
|
byte[] data = new byte[firstStuff.length];
|
||||||
|
queue.raf.seek(headerLength + Element.HEADER_LENGTH);
|
||||||
|
queue.raf.readFully(data, 0, firstStuff.length);
|
||||||
|
assertThat(data).isEqualTo(firstStuff);
|
||||||
|
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
// Next record is intact
|
||||||
|
assertThat(queue.peek()).isEqualTo(secondStuff);
|
||||||
|
|
||||||
|
// First should have been erased.
|
||||||
|
queue.raf.seek(headerLength + Element.HEADER_LENGTH);
|
||||||
|
queue.raf.readFully(data, 0, firstStuff.length);
|
||||||
|
assertThat(data).isEqualTo(new byte[firstStuff.length]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testZeroSizeInHeaderThrows() throws IOException {
|
||||||
|
RandomAccessFile emptyFile = new RandomAccessFile(file, "rwd");
|
||||||
|
emptyFile.setLength(INITIAL_LENGTH);
|
||||||
|
emptyFile.getChannel().force(true);
|
||||||
|
emptyFile.close();
|
||||||
|
|
||||||
|
try {
|
||||||
|
newQueueFile();
|
||||||
|
fail("Should have thrown about bad header length");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
assertThat(ex).hasMessageThat()
|
||||||
|
.isEqualTo("File is corrupt; length stored in header (0) is invalid.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSizeLessThanHeaderThrows() throws IOException {
|
||||||
|
RandomAccessFile emptyFile = new RandomAccessFile(file, "rwd");
|
||||||
|
emptyFile.setLength(INITIAL_LENGTH);
|
||||||
|
if (forceLegacy) {
|
||||||
|
emptyFile.writeInt(headerLength - 1);
|
||||||
|
} else {
|
||||||
|
emptyFile.writeInt(0x80000001);
|
||||||
|
emptyFile.writeLong(headerLength - 1);
|
||||||
|
}
|
||||||
|
emptyFile.getChannel().force(true);
|
||||||
|
emptyFile.close();
|
||||||
|
|
||||||
|
try {
|
||||||
|
newQueueFile();
|
||||||
|
fail();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
assertThat(ex.getMessage()).isIn(
|
||||||
|
Arrays.asList("File is corrupt; length stored in header (15) is invalid.",
|
||||||
|
"File is corrupt; length stored in header (31) is invalid."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testNegativeSizeInHeaderThrows() throws IOException {
|
||||||
|
RandomAccessFile emptyFile = new RandomAccessFile(file, "rwd");
|
||||||
|
emptyFile.seek(0);
|
||||||
|
emptyFile.writeInt(-2147483648);
|
||||||
|
emptyFile.setLength(INITIAL_LENGTH);
|
||||||
|
emptyFile.getChannel().force(true);
|
||||||
|
emptyFile.close();
|
||||||
|
|
||||||
|
try {
|
||||||
|
newQueueFile();
|
||||||
|
fail("Should have thrown about bad header length");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
assertThat(ex.getMessage()).isIn(
|
||||||
|
Arrays.asList("File is corrupt; length stored in header (-2147483648) is invalid.",
|
||||||
|
"Unable to read version 0 format. Supported versions are 1 and legacy."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeMultipleDoesNotCorrupt() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
queue.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.remove(1);
|
||||||
|
assertThat(queue.size()).isEqualTo(9);
|
||||||
|
assertThat(queue.peek()).isEqualTo(values[1]);
|
||||||
|
|
||||||
|
queue.remove(3);
|
||||||
|
queue = newQueueFile();
|
||||||
|
assertThat(queue.size()).isEqualTo(6);
|
||||||
|
assertThat(queue.peek()).isEqualTo(values[4]);
|
||||||
|
|
||||||
|
queue.remove(6);
|
||||||
|
assertThat(queue.isEmpty()).isTrue();
|
||||||
|
assertThat(queue.peek()).isNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeDoesNotCorrupt() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
queue.add(values[127]);
|
||||||
|
byte[] secondStuff = values[253];
|
||||||
|
queue.add(secondStuff);
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
queue = newQueueFile();
|
||||||
|
assertThat(queue.peek()).isEqualTo(secondStuff);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeFromEmptyFileThrows() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
try {
|
||||||
|
queue.remove();
|
||||||
|
fail("Should have thrown about removing from empty file.");
|
||||||
|
} catch (NoSuchElementException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeZeroFromEmptyFileDoesNothing() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
queue.remove(0);
|
||||||
|
assertThat(queue.isEmpty()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeNegativeNumberOfElementsThrows() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
queue.add(values[127]);
|
||||||
|
|
||||||
|
try {
|
||||||
|
queue.remove(-1);
|
||||||
|
fail("Should have thrown about removing negative number of elements.");
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
assertThat(ex) //
|
||||||
|
.hasMessageThat().isEqualTo("Cannot remove negative (-1) number of elements.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeZeroElementsDoesNothing() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
queue.add(values[127]);
|
||||||
|
|
||||||
|
queue.remove(0);
|
||||||
|
assertThat(queue.size()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removeBeyondQueueSizeElementsThrows() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
queue.add(values[127]);
|
||||||
|
|
||||||
|
try {
|
||||||
|
queue.remove(10);
|
||||||
|
fail("Should have thrown about removing too many elements.");
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
assertThat(ex) //
|
||||||
|
.hasMessageThat()
|
||||||
|
.isEqualTo("Cannot remove more elements (10) than present in queue (1).");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removingBigDamnBlocksErasesEffectively() throws IOException {
|
||||||
|
byte[] bigBoy = new byte[7000];
|
||||||
|
for (int i = 0; i < 7000; i += 100) {
|
||||||
|
System.arraycopy(values[100], 0, bigBoy, i, values[100].length);
|
||||||
|
}
|
||||||
|
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
queue.add(bigBoy);
|
||||||
|
byte[] secondStuff = values[123];
|
||||||
|
queue.add(secondStuff);
|
||||||
|
|
||||||
|
// Confirm that bigBoy was in the file before we remove.
|
||||||
|
byte[] data = new byte[bigBoy.length];
|
||||||
|
queue.raf.seek(headerLength + Element.HEADER_LENGTH);
|
||||||
|
queue.raf.readFully(data, 0, bigBoy.length);
|
||||||
|
assertThat(data).isEqualTo(bigBoy);
|
||||||
|
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
// Next record is intact
|
||||||
|
assertThat(queue.peek()).isEqualTo(secondStuff);
|
||||||
|
|
||||||
|
// First should have been erased.
|
||||||
|
queue.raf.seek(headerLength + Element.HEADER_LENGTH);
|
||||||
|
queue.raf.readFully(data, 0, bigBoy.length);
|
||||||
|
assertThat(data).isEqualTo(new byte[bigBoy.length]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testAddAndRemoveElements() throws IOException {
|
||||||
|
long start = System.nanoTime();
|
||||||
|
|
||||||
|
Queue<byte[]> expected = new ArrayDeque<>();
|
||||||
|
|
||||||
|
for (int round = 0; round < 5; round++) {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
for (int i = 0; i < N; i++) {
|
||||||
|
queue.add(values[i]);
|
||||||
|
expected.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Leave N elements in round N, 15 total for 5 rounds. Removing all the
|
||||||
|
// elements would be like starting with an empty queue.
|
||||||
|
for (int i = 0; i < N - round - 1; i++) {
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected.remove());
|
||||||
|
queue.remove();
|
||||||
|
}
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove and validate remaining 15 elements.
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
assertThat(queue.size()).isEqualTo(15);
|
||||||
|
assertThat(queue.size()).isEqualTo(expected.size());
|
||||||
|
while (!expected.isEmpty()) {
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected.remove());
|
||||||
|
queue.remove();
|
||||||
|
}
|
||||||
|
queue.close();
|
||||||
|
|
||||||
|
// length() returns 0, but I checked the size w/ 'ls', and it is correct.
|
||||||
|
// assertEquals(65536, file.length());
|
||||||
|
|
||||||
|
logger.info("Ran in " + ((System.nanoTime() - start) / 1000000) + "ms.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Tests queue expansion when the data crosses EOF. */
|
||||||
|
@Test public void testSplitExpansion() throws IOException {
|
||||||
|
// This should result in 3560 bytes.
|
||||||
|
int max = 80;
|
||||||
|
|
||||||
|
Queue<byte[]> expected = new ArrayDeque<>();
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
for (int i = 0; i < max; i++) {
|
||||||
|
expected.add(values[i]);
|
||||||
|
queue.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove all but 1.
|
||||||
|
for (int i = 1; i < max; i++) {
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected.remove());
|
||||||
|
queue.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should wrap around before expanding.
|
||||||
|
for (int i = 0; i < N; i++) {
|
||||||
|
expected.add(values[i]);
|
||||||
|
queue.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!expected.isEmpty()) {
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected.remove());
|
||||||
|
queue.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Tests failed queue expansion when the data crosses EOF. */
|
||||||
|
@Test public void testFailedSplitExpansion() throws IOException {
|
||||||
|
// This should results in a full file, but doesn't trigger an expansion (yet)
|
||||||
|
int max = 86;
|
||||||
|
|
||||||
|
Queue<byte[]> expected = new ArrayDeque<>();
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
for (int i = 0; i < max; i++) {
|
||||||
|
expected.add(values[i]);
|
||||||
|
queue.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove all but 1 value and add back
|
||||||
|
// This should wrap around before expanding.
|
||||||
|
for (int i = 0; i < max - 1; i++) {
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected.remove());
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
expected.add(values[i]);
|
||||||
|
queue.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Try to insert element that causes file expansion, but fail
|
||||||
|
long fileLengthBeforeExpansion = file.length();
|
||||||
|
BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
|
||||||
|
queue = newQueueFile(braf);
|
||||||
|
try {
|
||||||
|
queue.add(values[max]);
|
||||||
|
fail();
|
||||||
|
} catch (IOException e) { /* expected */ }
|
||||||
|
|
||||||
|
//Check that the queue continues valid
|
||||||
|
braf.rejectCommit = false;
|
||||||
|
while (!expected.isEmpty()) {
|
||||||
|
assertThat(queue.peek()).isEqualTo(expected.remove());
|
||||||
|
queue.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testFailedAdd() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
queueFile.add(values[253]);
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
|
||||||
|
queueFile = newQueueFile(braf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
queueFile.add(values[252]);
|
||||||
|
fail();
|
||||||
|
} catch (IOException e) { /* expected */ }
|
||||||
|
|
||||||
|
braf.rejectCommit = false;
|
||||||
|
|
||||||
|
// Allow a subsequent add to succeed.
|
||||||
|
queueFile.add(values[251]);
|
||||||
|
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
queueFile = newQueueFile();
|
||||||
|
assertThat(queueFile.size()).isEqualTo(2);
|
||||||
|
assertThat(queueFile.peek()).isEqualTo(values[253]);
|
||||||
|
queueFile.remove();
|
||||||
|
assertThat(queueFile.peek()).isEqualTo(values[251]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testFailedRemoval() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
queueFile.add(values[253]);
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
|
||||||
|
queueFile = newQueueFile(braf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
queueFile.remove();
|
||||||
|
fail();
|
||||||
|
} catch (IOException e) { /* expected */ }
|
||||||
|
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
queueFile = newQueueFile();
|
||||||
|
assertThat(queueFile.size()).isEqualTo(1);
|
||||||
|
assertThat(queueFile.peek()).isEqualTo(values[253]);
|
||||||
|
|
||||||
|
queueFile.add(values[99]);
|
||||||
|
queueFile.remove();
|
||||||
|
assertThat(queueFile.peek()).isEqualTo(values[99]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testFailedExpansion() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
queueFile.add(values[253]);
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
|
||||||
|
queueFile = newQueueFile(braf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// This should trigger an expansion which should fail.
|
||||||
|
queueFile.add(new byte[8000]);
|
||||||
|
fail();
|
||||||
|
} catch (IOException e) { /* expected */ }
|
||||||
|
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
queueFile = newQueueFile();
|
||||||
|
assertThat(queueFile.size()).isEqualTo(1);
|
||||||
|
assertThat(queueFile.peek()).isEqualTo(values[253]);
|
||||||
|
assertThat(queueFile.fileLength).isEqualTo(4096);
|
||||||
|
|
||||||
|
queueFile.add(values[99]);
|
||||||
|
queueFile.remove();
|
||||||
|
assertThat(queueFile.peek()).isEqualTo(values[99]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exercise a bug where wrapped elements were getting corrupted when the
|
||||||
|
* QueueFile was forced to expand in size and a portion of the final Element
|
||||||
|
* had been wrapped into space at the beginning of the file.
|
||||||
|
*/
|
||||||
|
@Test public void testFileExpansionDoesntCorruptWrappedElements()
|
||||||
|
throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
// Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5.
|
||||||
|
byte[][] values = new byte[5][];
|
||||||
|
for (int blockNum = 0; blockNum < values.length; blockNum++) {
|
||||||
|
values[blockNum] = new byte[1024];
|
||||||
|
for (int i = 0; i < values[blockNum].length; i++) {
|
||||||
|
values[blockNum][i] = (byte) (blockNum + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// First, add the first two blocks to the queue, remove one leaving a
|
||||||
|
// 1K space at the beginning of the buffer.
|
||||||
|
queue.add(values[0]);
|
||||||
|
queue.add(values[1]);
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
// The trailing end of block "4" will be wrapped to the start of the buffer.
|
||||||
|
queue.add(values[2]);
|
||||||
|
queue.add(values[3]);
|
||||||
|
|
||||||
|
// Cause buffer to expand as there isn't space between the end of block "4"
|
||||||
|
// and the start of block "2". Internally the queue should cause block "4"
|
||||||
|
// to be contiguous, but there was a bug where that wasn't happening.
|
||||||
|
queue.add(values[4]);
|
||||||
|
|
||||||
|
// Make sure values are not corrupted, specifically block "4" that wasn't
|
||||||
|
// being made contiguous in the version with the bug.
|
||||||
|
for (int blockNum = 1; blockNum < values.length; blockNum++) {
|
||||||
|
byte[] value = queue.peek();
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
for (int i = 0; i < value.length; i++) {
|
||||||
|
assertThat(value[i]).named("Block %1$d corrupted at byte index %2$d.", blockNum + 1, i)
|
||||||
|
.isEqualTo((byte) (blockNum + 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exercise a bug where wrapped elements were getting corrupted when the
|
||||||
|
* QueueFile was forced to expand in size and a portion of the final Element
|
||||||
|
* had been wrapped into space at the beginning of the file - if multiple
|
||||||
|
* Elements have been written to empty buffer space at the start does the
|
||||||
|
* expansion correctly update all their positions?
|
||||||
|
*/
|
||||||
|
@Test public void testFileExpansionCorrectlyMovesElements() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
// Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5.
|
||||||
|
byte[][] values = new byte[5][];
|
||||||
|
for (int blockNum = 0; blockNum < values.length; blockNum++) {
|
||||||
|
values[blockNum] = new byte[1024];
|
||||||
|
for (int i = 0; i < values[blockNum].length; i++) {
|
||||||
|
values[blockNum][i] = (byte) (blockNum + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// smaller data elements
|
||||||
|
byte[][] smaller = new byte[3][];
|
||||||
|
for (int blockNum = 0; blockNum < smaller.length; blockNum++) {
|
||||||
|
smaller[blockNum] = new byte[256];
|
||||||
|
for (int i = 0; i < smaller[blockNum].length; i++) {
|
||||||
|
smaller[blockNum][i] = (byte) (blockNum + 6);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// First, add the first two blocks to the queue, remove one leaving a
|
||||||
|
// 1K space at the beginning of the buffer.
|
||||||
|
queue.add(values[0]);
|
||||||
|
queue.add(values[1]);
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
// The trailing end of block "4" will be wrapped to the start of the buffer.
|
||||||
|
queue.add(values[2]);
|
||||||
|
queue.add(values[3]);
|
||||||
|
|
||||||
|
// Now fill in some space with smaller blocks, none of which will cause
|
||||||
|
// an expansion.
|
||||||
|
queue.add(smaller[0]);
|
||||||
|
queue.add(smaller[1]);
|
||||||
|
queue.add(smaller[2]);
|
||||||
|
|
||||||
|
// Cause buffer to expand as there isn't space between the end of the
|
||||||
|
// smaller block "8" and the start of block "2". Internally the queue
|
||||||
|
// should cause all of tbe smaller blocks, and the trailing end of
|
||||||
|
// block "5" to be moved to the end of the file.
|
||||||
|
queue.add(values[4]);
|
||||||
|
|
||||||
|
byte[] expectedBlockNumbers = {2, 3, 4, 6, 7, 8, 5};
|
||||||
|
|
||||||
|
// Make sure values are not corrupted, specifically block "4" that wasn't
|
||||||
|
// being made contiguous in the version with the bug.
|
||||||
|
for (byte expectedBlockNumber : expectedBlockNumbers) {
|
||||||
|
byte[] value = queue.peek();
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
for (int i = 0; i < value.length; i++) {
|
||||||
|
assertThat(value[i]).named("Block %1$d corrupted at byte index %2$d.", expectedBlockNumber,
|
||||||
|
i).isEqualTo(expectedBlockNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removingElementZeroesData() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile(true);
|
||||||
|
queueFile.add(values[4]);
|
||||||
|
queueFile.remove();
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
BufferedSource source = Okio.buffer(Okio.source(file));
|
||||||
|
source.skip(headerLength);
|
||||||
|
source.skip(Element.HEADER_LENGTH);
|
||||||
|
assertThat(source.readByteString(4).hex()).isEqualTo("00000000");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void removingElementDoesNotZeroData() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile(false);
|
||||||
|
queueFile.add(values[4]);
|
||||||
|
queueFile.remove();
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
BufferedSource source = Okio.buffer(Okio.source(file));
|
||||||
|
source.skip(headerLength);
|
||||||
|
source.skip(Element.HEADER_LENGTH);
|
||||||
|
assertThat(source.readByteString(4).hex()).isEqualTo("04030201");
|
||||||
|
|
||||||
|
source.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exercise a bug where file expansion would leave garbage at the start of the header
|
||||||
|
* and after the last element.
|
||||||
|
*/
|
||||||
|
@Test public void testFileExpansionCorrectlyZeroesData()
|
||||||
|
throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
// Create test data - 1k blocks marked consecutively 1, 2, 3, 4 and 5.
|
||||||
|
byte[][] values = new byte[5][];
|
||||||
|
for (int blockNum = 0; blockNum < values.length; blockNum++) {
|
||||||
|
values[blockNum] = new byte[1024];
|
||||||
|
for (int i = 0; i < values[blockNum].length; i++) {
|
||||||
|
values[blockNum][i] = (byte) (blockNum + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// First, add the first two blocks to the queue, remove one leaving a
|
||||||
|
// 1K space at the beginning of the buffer.
|
||||||
|
queue.add(values[0]);
|
||||||
|
queue.add(values[1]);
|
||||||
|
queue.remove();
|
||||||
|
|
||||||
|
// The trailing end of block "4" will be wrapped to the start of the buffer.
|
||||||
|
queue.add(values[2]);
|
||||||
|
queue.add(values[3]);
|
||||||
|
|
||||||
|
// Cause buffer to expand as there isn't space between the end of block "4"
|
||||||
|
// and the start of block "2". Internally the queue will cause block "4"
|
||||||
|
// to be contiguous. There was a bug where the start of the buffer still
|
||||||
|
// contained the tail end of block "4", and garbage was copied after the tail
|
||||||
|
// end of the last element.
|
||||||
|
queue.add(values[4]);
|
||||||
|
|
||||||
|
// Read from header to first element and make sure it's zeroed.
|
||||||
|
int firstElementPadding = Element.HEADER_LENGTH + 1024;
|
||||||
|
byte[] data = new byte[firstElementPadding];
|
||||||
|
queue.raf.seek(headerLength);
|
||||||
|
queue.raf.readFully(data, 0, firstElementPadding);
|
||||||
|
assertThat(data).isEqualTo(new byte[firstElementPadding]);
|
||||||
|
|
||||||
|
// Read from the last element to the end and make sure it's zeroed.
|
||||||
|
int endOfLastElement = headerLength + firstElementPadding + 4 * (Element.HEADER_LENGTH + 1024);
|
||||||
|
int readLength = (int) (queue.raf.length() - endOfLastElement);
|
||||||
|
data = new byte[readLength];
|
||||||
|
queue.raf.seek(endOfLastElement);
|
||||||
|
queue.raf.readFully(data, 0, readLength);
|
||||||
|
assertThat(data).isEqualTo(new byte[readLength]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exercise a bug where an expanding queue file where the start and end positions
|
||||||
|
* are the same causes corruption.
|
||||||
|
*/
|
||||||
|
@Test public void testSaturatedFileExpansionMovesElements() throws IOException {
|
||||||
|
QueueFile queue = newQueueFile();
|
||||||
|
|
||||||
|
// Create test data - 1016-byte blocks marked consecutively 1, 2, 3, 4, 5 and 6,
|
||||||
|
// four of which perfectly fill the queue file, taking into account the file header
|
||||||
|
// and the item headers.
|
||||||
|
// Each item is of length
|
||||||
|
// (QueueFile.INITIAL_LENGTH - headerLength) / 4 - element_header_length
|
||||||
|
// = 1016 bytes
|
||||||
|
byte[][] values = new byte[6][];
|
||||||
|
for (int blockNum = 0; blockNum < values.length; blockNum++) {
|
||||||
|
values[blockNum] = new byte[(INITIAL_LENGTH - headerLength) / 4 - Element.HEADER_LENGTH];
|
||||||
|
for (int i = 0; i < values[blockNum].length; i++) {
|
||||||
|
values[blockNum][i] = (byte) (blockNum + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Saturate the queue file
|
||||||
|
queue.add(values[0]);
|
||||||
|
queue.add(values[1]);
|
||||||
|
queue.add(values[2]);
|
||||||
|
queue.add(values[3]);
|
||||||
|
|
||||||
|
// Remove an element and add a new one so that the position of the start and
|
||||||
|
// end of the queue are equal
|
||||||
|
queue.remove();
|
||||||
|
queue.add(values[4]);
|
||||||
|
|
||||||
|
// Cause the queue file to expand
|
||||||
|
queue.add(values[5]);
|
||||||
|
|
||||||
|
// Make sure values are not corrupted
|
||||||
|
for (int i = 1; i < 6; i++) {
|
||||||
|
assertThat(queue.peek()).isEqualTo(values[i]);
|
||||||
|
queue.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exercise a bug where opening a queue whose first or last element's header
|
||||||
|
* was non contiguous throws an {@link java.io.EOFException}.
|
||||||
|
*/
|
||||||
|
@Test public void testReadHeadersFromNonContiguousQueueWorks() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
|
||||||
|
// Fill the queue up to `length - 2` (i.e. remainingBytes() == 2).
|
||||||
|
for (int i = 0; i < 15; i++) {
|
||||||
|
queueFile.add(values[N - 1]);
|
||||||
|
}
|
||||||
|
queueFile.add(values[219]);
|
||||||
|
|
||||||
|
// Remove first item so we have room to add another one without growing the file.
|
||||||
|
queueFile.remove();
|
||||||
|
|
||||||
|
// Add any element element and close the queue.
|
||||||
|
queueFile.add(values[6]);
|
||||||
|
int queueSize = queueFile.size();
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
// File should not be corrupted.
|
||||||
|
QueueFile queueFile2 = newQueueFile();
|
||||||
|
assertThat(queueFile2.size()).isEqualTo(queueSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIterator() throws IOException {
|
||||||
|
byte[] data = values[10];
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
for (int j = 0; j < i; j++) {
|
||||||
|
queueFile.add(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
int saw = 0;
|
||||||
|
for (byte[] element : queueFile) {
|
||||||
|
assertThat(element).isEqualTo(data);
|
||||||
|
saw++;
|
||||||
|
}
|
||||||
|
assertThat(saw).isEqualTo(i);
|
||||||
|
queueFile.close();
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorNextThrowsWhenEmpty() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.next();
|
||||||
|
fail();
|
||||||
|
} catch (NoSuchElementException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorNextThrowsWhenExhausted() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
queueFile.add(values[0]);
|
||||||
|
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
iterator.next();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.next();
|
||||||
|
fail();
|
||||||
|
} catch (NoSuchElementException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorRemove() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
for (int i = 0; i < 15; i++) {
|
||||||
|
queueFile.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
iterator.next();
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(queueFile).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorRemoveDisallowsConcurrentModification() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
for (int i = 0; i < 15; i++) {
|
||||||
|
queueFile.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
iterator.next();
|
||||||
|
queueFile.remove();
|
||||||
|
try {
|
||||||
|
iterator.remove();
|
||||||
|
fail();
|
||||||
|
} catch (ConcurrentModificationException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorHasNextDisallowsConcurrentModification() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
for (int i = 0; i < 15; i++) {
|
||||||
|
queueFile.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
iterator.next();
|
||||||
|
queueFile.remove();
|
||||||
|
try {
|
||||||
|
iterator.hasNext();
|
||||||
|
fail();
|
||||||
|
} catch (ConcurrentModificationException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorDisallowsConcurrentModificationWithClear() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
for (int i = 0; i < 15; i++) {
|
||||||
|
queueFile.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
iterator.next();
|
||||||
|
queueFile.clear();
|
||||||
|
try {
|
||||||
|
iterator.hasNext();
|
||||||
|
fail();
|
||||||
|
} catch (ConcurrentModificationException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testIteratorOnlyRemovesFromHead() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
for (int i = 0; i < 15; i++) {
|
||||||
|
queueFile.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
iterator.next();
|
||||||
|
iterator.next();
|
||||||
|
|
||||||
|
try {
|
||||||
|
iterator.remove();
|
||||||
|
fail();
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
assertThat(ex).hasMessageThat().isEqualTo("Removal is only permitted from the head.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void iteratorThrowsIOException() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
queueFile.add(values[253]);
|
||||||
|
queueFile.close();
|
||||||
|
|
||||||
|
final class BrokenRandomAccessFile extends RandomAccessFile {
|
||||||
|
boolean fail = false;
|
||||||
|
|
||||||
|
BrokenRandomAccessFile(File file, String mode)
|
||||||
|
throws FileNotFoundException {
|
||||||
|
super(file, mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void write(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (fail) {
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
super.write(b, off, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (fail) {
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
return super.read(b, off, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
|
||||||
|
queueFile = newQueueFile(braf);
|
||||||
|
Iterator<byte[]> iterator = queueFile.iterator();
|
||||||
|
|
||||||
|
braf.fail = true;
|
||||||
|
try {
|
||||||
|
iterator.next();
|
||||||
|
fail();
|
||||||
|
} catch (Exception ioe) {
|
||||||
|
assertThat(ioe).isInstanceOf(IOException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
braf.fail = false;
|
||||||
|
iterator.next();
|
||||||
|
|
||||||
|
braf.fail = true;
|
||||||
|
try {
|
||||||
|
iterator.remove();
|
||||||
|
fail();
|
||||||
|
} catch (Exception ioe) {
|
||||||
|
assertThat(ioe).isInstanceOf(IOException.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void queueToString() throws IOException {
|
||||||
|
QueueFile queueFile = newQueueFile();
|
||||||
|
for (int i = 0; i < 15; i++) {
|
||||||
|
queueFile.add(values[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (forceLegacy) {
|
||||||
|
assertThat(queueFile.toString()).contains("zero=true, versioned=false, length=4096,"
|
||||||
|
+ " size=15,"
|
||||||
|
+ " first=Element[position=16, length=0], last=Element[position=163, length=14]}");
|
||||||
|
} else {
|
||||||
|
assertThat(queueFile.toString()).contains("zero=true, versioned=true, length=4096,"
|
||||||
|
+ " size=15,"
|
||||||
|
+ " first=Element[position=32, length=0], last=Element[position=179, length=14]}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RandomAccessFile that can break when you go to write the COMMITTED
|
||||||
|
* status.
|
||||||
|
*/
|
||||||
|
static final class BrokenRandomAccessFile extends RandomAccessFile {
|
||||||
|
boolean rejectCommit = true;
|
||||||
|
|
||||||
|
BrokenRandomAccessFile(File file, String mode)
|
||||||
|
throws FileNotFoundException {
|
||||||
|
super(file, mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void write(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (rejectCommit && getFilePointer() == 0) {
|
||||||
|
throw new IOException("No commit for you!");
|
||||||
|
}
|
||||||
|
super.write(b, off, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
// Copyright 2012 Square, Inc.
|
||||||
|
package com.squareup.tape2;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import okio.BufferedSink;
|
||||||
|
import okio.Okio;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
final class QueueTestUtils {
|
||||||
|
static final String TRUNCATED_ONE_ENTRY_SERIALIZED_QUEUE =
|
||||||
|
"/truncated-one-entry-serialized-queue";
|
||||||
|
static final String TRUNCATED_EMPTY_SERIALIZED_QUEUE = "/truncated-empty-serialized-queue";
|
||||||
|
static final String ONE_ENTRY_SERIALIZED_QUEUE = "/one-entry-serialized-queue";
|
||||||
|
static final String EMPTY_SERIALIZED_QUEUE = "/empty-serialized-queue";
|
||||||
|
static final String FRESH_SERIALIZED_QUEUE = "/fresh-serialized-queue";
|
||||||
|
|
||||||
|
static File copyTestFile(String file) throws IOException {
|
||||||
|
File newFile = File.createTempFile(file, "test");
|
||||||
|
InputStream in = QueueTestUtils.class.getResourceAsStream(file);
|
||||||
|
try (BufferedSink sink = Okio.buffer(Okio.sink(newFile))) {
|
||||||
|
sink.writeAll(Okio.source(in));
|
||||||
|
}
|
||||||
|
assertTrue(newFile.exists());
|
||||||
|
return newFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** File that suppresses deletion. */
|
||||||
|
static class UndeletableFile extends File {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
UndeletableFile(String name) {
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean delete() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueTestUtils() {
|
||||||
|
throw new AssertionError("No instances.");
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -15,4 +15,5 @@ include 'datastructures-json-micro'
|
||||||
include 'datastructures-json-minimal'
|
include 'datastructures-json-minimal'
|
||||||
include 'datastructures-json-noggit'
|
include 'datastructures-json-noggit'
|
||||||
include 'datastructures-json-simple'
|
include 'datastructures-json-simple'
|
||||||
|
include 'datastructures-queue-tape'
|
||||||
include 'benchmark'
|
include 'benchmark'
|
||||||
|
|
Loading…
Reference in a new issue