enable tests, add NPE guards to DefaultBulkProcessor

This commit is contained in:
Jörg Prante 2019-05-03 11:31:00 +02:00
parent 43c983f2a9
commit 979b06fbf2
10 changed files with 107 additions and 112 deletions

View file

@ -65,7 +65,7 @@ subprojects {
}
test {
enabled = false
enabled = true
useJUnitPlatform()
// we MUST use this hack because of Elasticsearch 2.2.1 Lucene 5.4.1 MMapDirectory unmap() hackery
doFirst {

View file

@ -20,8 +20,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
@ -31,8 +29,6 @@ import java.util.logging.Logger;
*/
public class DefaultBulkProcessor implements BulkProcessor {
private static final Logger logger = Logger.getLogger(DefaultBulkProcessor.class.getName());
private final int bulkActions;
private final long bulkSize;
@ -56,9 +52,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
if (listener == null) {
throw new IllegalArgumentException();
}
this.bulkRequestHandler = concurrentRequests == 0 ?
new SyncBulkRequestHandler(client, listener) :
new AsyncBulkRequestHandler(client, listener, concurrentRequests);
@ -76,9 +69,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
public static Builder builder(ElasticsearchClient client, Listener listener) {
if (client == null) {
throw new NullPointerException("The client you specified while building a BulkProcessor is null");
}
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null");
return new Builder(client, listener);
}
@ -91,6 +83,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
*/
@Override
public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException {
Objects.requireNonNull(unit, "A time unit is required for awaitFlush() but null");
if (closed) {
return true;
}
@ -99,7 +92,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
execute();
}
// wait for all bulk responses
return this.bulkRequestHandler.close(timeout, unit);
return bulkRequestHandler.close(timeout, unit);
}
/**
@ -119,18 +112,19 @@ public class DefaultBulkProcessor implements BulkProcessor {
*/
@Override
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
Objects.requireNonNull(unit, "A time unit is required for awaitCLose() but null");
if (closed) {
return true;
}
closed = true;
if (this.scheduledFuture != null) {
FutureUtils.cancel(this.scheduledFuture);
this.scheduler.shutdown();
if (scheduledFuture != null) {
FutureUtils.cancel(scheduledFuture);
scheduler.shutdown();
}
if (bulkRequest.numberOfActions() > 0) {
execute();
}
return this.bulkRequestHandler.close(timeout, unit);
return bulkRequestHandler.close(timeout, unit);
}
/**
@ -213,8 +207,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private boolean isOverTheLimit() {
return bulkActions != -1 &&
bulkRequest.numberOfActions() >= bulkActions ||
bulkSize != -1 &&
bulkRequest.estimatedSizeInBytes() >= bulkSize;
bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize;
}
/**
@ -342,6 +335,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final DefaultBulkProcessor.Listener listener;
SyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener) {
Objects.requireNonNull(listener, "A listener is required for SyncBulkRequestHandler but null");
this.client = client;
this.listener = listener;
}
@ -378,6 +372,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final int concurrentRequests;
private AsyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener, int concurrentRequests) {
Objects.requireNonNull(listener, "A listener is required for AsyncBulkRequestHandler but null");
this.client = client;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
@ -426,10 +421,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
@Override
public boolean close(long timeout, TimeUnit unit) throws InterruptedException {
logger.log(Level.INFO, "semaphore=" + semaphore +
" concurrentRequests=" + concurrentRequests +
" timeout=" + timeout +
" unit=" + unit);
if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) {
semaphore.release(concurrentRequests);
return true;

View file

@ -19,8 +19,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
@ -39,7 +39,7 @@ import java.util.Random;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class TestExtension implements ParameterResolver, BeforeAllCallback, AfterAllCallback {
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
private static final Logger logger = LogManager.getLogger("test");
@ -73,18 +73,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
setHome(System.getProperty("path.home") + "/" + getRandomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create());
}
@Override
public void beforeAll(ExtensionContext context) throws Exception {
public void beforeEach(ExtensionContext context) throws Exception {
Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
logger.info("starting cluster");
deleteFiles(Paths.get(getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
@ -114,9 +111,11 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
}
@Override
public void afterAll(ExtensionContext context) throws Exception {
public void afterEach(ExtensionContext context) throws Exception {
closeNodes();
deleteFiles(Paths.get(getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
private void setClusterName(String cluster) {
@ -169,6 +168,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
}
}
private String getRandomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Helper create() {
return new Helper();
}
@ -186,6 +194,14 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
buildNode(id).start();
}
ElasticsearchClient client(String id) {
return clients.get(id);
}
String randomString(int n) {
return getRandomString(n);
}
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
.put(getNodeSettings())
@ -198,19 +214,5 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
logger.info("clients={}", clients);
return node;
}
String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
ElasticsearchClient client(String id) {
return clients.get(id);
}
}
}

View file

@ -44,7 +44,7 @@ class SmokeTest {
client.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS);
client.delete("test_smoke", "1");
client.deleteIndex("test_smoke");
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke", Settings.settingsBuilder()
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke_2", Settings.settingsBuilder()
.build());
assertEquals(0, indexDefinition.getReplicaLevel());
client.newIndex(indexDefinition);

View file

@ -19,8 +19,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
@ -39,7 +39,7 @@ import java.util.Random;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class TestExtension implements ParameterResolver, BeforeAllCallback, AfterAllCallback {
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
private static final Logger logger = LogManager.getLogger("test");
@ -73,17 +73,14 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
setHome(System.getProperty("path.home") + "/" + getRandomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create());
}
@Override
public void beforeAll(ExtensionContext context) throws Exception {
public void beforeEach(ExtensionContext context) throws Exception {
Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
deleteFiles(Paths.get(getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
logger.info("starting cluster");
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
@ -114,9 +111,11 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
}
@Override
public void afterAll(ExtensionContext context) throws Exception {
public void afterEach(ExtensionContext context) throws Exception {
closeNodes();
deleteFiles(Paths.get(getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
private void setClusterName(String cluster) {
@ -169,6 +168,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
}
}
private String getRandomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Helper create() {
return new Helper();
}
@ -179,6 +187,18 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
buildNode(id).start();
}
ElasticsearchClient client(String id) {
return clients.get(id);
}
String getCluster() {
return getClusterName();
}
String randomString(int n) {
return getRandomString(n);
}
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
.put("cluster.name", getClusterName())
@ -192,22 +212,5 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
logger.info("clients={}", clients);
return node;
}
String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
ElasticsearchClient client(String id) {
return clients.get(id);
}
String getCluster() {
return getClusterName();
}
}
}

View file

@ -13,7 +13,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
@ -43,7 +42,6 @@ class ClientTest {
ClientTest(TestExtension.Helper helper) {
this.helper = helper;
helper.startNode("2");
}
@Test
@ -145,7 +143,6 @@ class ClientTest {
}
@Test
@Disabled
void testThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors();
long maxactions = MAX_ACTIONS_PER_REQUEST;

View file

@ -25,9 +25,9 @@ class DuplicateIDTest {
private final static Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
private final static Long MAX_ACTIONS_PER_REQUEST = 10L;
private final static Long MAX_ACTIONS_PER_REQUEST = 100L;
private final static Long ACTIONS = 5L;
private final static Long ACTIONS = 50L;
private final TestExtension.Helper helper;

View file

@ -33,7 +33,7 @@ class IndexShiftTest {
}
@Test
void testIndexAlias() throws Exception {
void testIndexShift() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(helper.getTransportSettings()).build();

View file

@ -45,7 +45,7 @@ class SmokeTest extends TestExtension {
client.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS);
client.delete("test_smoke", "1");
client.deleteIndex("test_smoke");
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test2", Settings.settingsBuilder()
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke_2", Settings.settingsBuilder()
.build());
assertEquals(0, indexDefinition.getReplicaLevel());
client.newIndex(indexDefinition);

View file

@ -19,8 +19,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
@ -39,7 +39,7 @@ import java.util.Random;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class TestExtension implements ParameterResolver, BeforeAllCallback, AfterAllCallback {
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
private static final Logger logger = LogManager.getLogger("test");
@ -59,7 +59,7 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
private int port;
private static final String key = "es-instance";
private static final String key = "es-test-instance";
private static final ExtensionContext.Namespace ns =
ExtensionContext.Namespace.create(TestExtension.class);
@ -73,17 +73,14 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create());
setHome(System.getProperty("path.home") + "/" + getRandomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
}
@Override
public void beforeAll(ExtensionContext context) throws Exception {
Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
deleteFiles(Paths.get(getHome() + "/data"));
logger.info("data files wiped: " + getHome());
Thread.sleep(2000L); // let OS commit changes
public void beforeEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
logger.info("starting cluster");
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
@ -114,10 +111,11 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
}
@Override
public void afterAll(ExtensionContext context) throws Exception {
public void afterEach(ExtensionContext context) throws Exception {
closeNodes();
deleteFiles(Paths.get(getHome() + "/data"));
logger.info("cluster stopped");
logger.info("data files wiped: " + getHome());
Thread.sleep(2000L); // let OS commit changes
}
private void setClusterName(String cluster) {
@ -170,6 +168,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
}
}
private String getRandomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Helper create() {
return new Helper();
}
@ -196,6 +203,18 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
buildNode(id).start();
}
ElasticsearchClient client(String id) {
return clients.get(id);
}
String getCluster() {
return getClusterName();
}
String randomString(int n) {
return getRandomString(n);
}
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
.put(getNodeSettings())
@ -208,22 +227,5 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte
logger.info("clients={}", clients);
return node;
}
String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
ElasticsearchClient client(String id) {
return clients.get(id);
}
String getCluster() {
return getClusterName();
}
}
}