From 979b06fbf2c10d530dcca3c78c95976f363c724a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Fri, 3 May 2019 11:31:00 +0200 Subject: [PATCH] enable tests, add NPE guards to DefaultBulkProcessor --- build.gradle | 2 +- .../xbib/elx/common/DefaultBulkProcessor.java | 33 ++++------ .../xbib/elx/common/test/TestExtension.java | 50 ++++++++------- .../org/xbib/elx/node/test/SmokeTest.java | 2 +- .../org/xbib/elx/node/test/TestExtension.java | 57 +++++++++-------- .../xbib/elx/transport/test/ClientTest.java | 3 - .../elx/transport/test/DuplicateIDTest.java | 4 +- .../elx/transport/test/IndexShiftTest.java | 2 +- .../xbib/elx/transport/test/SmokeTest.java | 2 +- .../elx/transport/test/TestExtension.java | 64 ++++++++++--------- 10 files changed, 107 insertions(+), 112 deletions(-) diff --git a/build.gradle b/build.gradle index df2dfa7..5273aa6 100644 --- a/build.gradle +++ b/build.gradle @@ -65,7 +65,7 @@ subprojects { } test { - enabled = false + enabled = true useJUnitPlatform() // we MUST use this hack because of Elasticsearch 2.2.1 Lucene 5.4.1 MMapDirectory unmap() hackery doFirst { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index c633ad1..74915c9 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -20,8 +20,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; /** * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request @@ -31,8 +29,6 @@ import java.util.logging.Logger; */ public class DefaultBulkProcessor implements BulkProcessor { - private static final Logger logger = Logger.getLogger(DefaultBulkProcessor.class.getName()); - private final int bulkActions; private final long bulkSize; @@ -56,9 +52,6 @@ public class DefaultBulkProcessor implements BulkProcessor { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); this.bulkRequest = new BulkRequest(); - if (listener == null) { - throw new IllegalArgumentException(); - } this.bulkRequestHandler = concurrentRequests == 0 ? new SyncBulkRequestHandler(client, listener) : new AsyncBulkRequestHandler(client, listener, concurrentRequests); @@ -76,9 +69,8 @@ public class DefaultBulkProcessor implements BulkProcessor { } public static Builder builder(ElasticsearchClient client, Listener listener) { - if (client == null) { - throw new NullPointerException("The client you specified while building a BulkProcessor is null"); - } + Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null"); + Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null"); return new Builder(client, listener); } @@ -91,6 +83,7 @@ public class DefaultBulkProcessor implements BulkProcessor { */ @Override public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException { + Objects.requireNonNull(unit, "A time unit is required for awaitFlush() but null"); if (closed) { return true; } @@ -99,7 +92,7 @@ public class DefaultBulkProcessor implements BulkProcessor { execute(); } // wait for all bulk responses - return this.bulkRequestHandler.close(timeout, unit); + return bulkRequestHandler.close(timeout, unit); } /** @@ -119,18 +112,19 @@ public class DefaultBulkProcessor implements BulkProcessor { */ @Override public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + Objects.requireNonNull(unit, "A time unit is required for awaitCLose() but null"); if (closed) { return true; } closed = true; - if (this.scheduledFuture != null) { - FutureUtils.cancel(this.scheduledFuture); - this.scheduler.shutdown(); + if (scheduledFuture != null) { + FutureUtils.cancel(scheduledFuture); + scheduler.shutdown(); } if (bulkRequest.numberOfActions() > 0) { execute(); } - return this.bulkRequestHandler.close(timeout, unit); + return bulkRequestHandler.close(timeout, unit); } /** @@ -213,8 +207,7 @@ public class DefaultBulkProcessor implements BulkProcessor { private boolean isOverTheLimit() { return bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions || - bulkSize != -1 && - bulkRequest.estimatedSizeInBytes() >= bulkSize; + bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize; } /** @@ -342,6 +335,7 @@ public class DefaultBulkProcessor implements BulkProcessor { private final DefaultBulkProcessor.Listener listener; SyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener) { + Objects.requireNonNull(listener, "A listener is required for SyncBulkRequestHandler but null"); this.client = client; this.listener = listener; } @@ -378,6 +372,7 @@ public class DefaultBulkProcessor implements BulkProcessor { private final int concurrentRequests; private AsyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener, int concurrentRequests) { + Objects.requireNonNull(listener, "A listener is required for AsyncBulkRequestHandler but null"); this.client = client; this.listener = listener; this.concurrentRequests = concurrentRequests; @@ -426,10 +421,6 @@ public class DefaultBulkProcessor implements BulkProcessor { @Override public boolean close(long timeout, TimeUnit unit) throws InterruptedException { - logger.log(Level.INFO, "semaphore=" + semaphore + - " concurrentRequests=" + concurrentRequests + - " timeout=" + timeout + - " unit=" + unit); if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) { semaphore.release(concurrentRequests); return true; diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java index fdf65e6..6186f8c 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java @@ -19,8 +19,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; -import org.junit.jupiter.api.extension.AfterAllCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; @@ -39,7 +39,7 @@ import java.util.Random; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -public class TestExtension implements ParameterResolver, BeforeAllCallback, AfterAllCallback { +public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { private static final Logger logger = LogManager.getLogger("test"); @@ -73,18 +73,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + setHome(System.getProperty("path.home") + "/" + getRandomString(8)); + setClusterName("test-cluster-" + System.getProperty("user.name")); return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create()); } @Override - public void beforeAll(ExtensionContext context) throws Exception { + public void beforeEach(ExtensionContext context) throws Exception { Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); - setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); - setClusterName("test-cluster-" + System.getProperty("user.name")); logger.info("starting cluster"); - deleteFiles(Paths.get(getHome() + "/data")); - logger.info("data files wiped"); - Thread.sleep(2000L); // let OS commit changes helper.startNode("1"); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); @@ -114,9 +111,11 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte } @Override - public void afterAll(ExtensionContext context) throws Exception { + public void afterEach(ExtensionContext context) throws Exception { closeNodes(); deleteFiles(Paths.get(getHome() + "/data")); + logger.info("data files wiped"); + Thread.sleep(2000L); // let OS commit changes } private void setClusterName(String cluster) { @@ -169,6 +168,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte } } + private String getRandomString(int len) { + final char[] buf = new char[len]; + final int n = numbersAndLetters.length - 1; + for (int i = 0; i < buf.length; i++) { + buf[i] = numbersAndLetters[random.nextInt(n)]; + } + return new String(buf); + } + private Helper create() { return new Helper(); } @@ -186,6 +194,14 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte buildNode(id).start(); } + ElasticsearchClient client(String id) { + return clients.get(id); + } + + String randomString(int n) { + return getRandomString(n); + } + private Node buildNode(String id) { Settings nodeSettings = settingsBuilder() .put(getNodeSettings()) @@ -198,19 +214,5 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte logger.info("clients={}", clients); return node; } - - String randomString(int len) { - final char[] buf = new char[len]; - final int n = numbersAndLetters.length - 1; - for (int i = 0; i < buf.length; i++) { - buf[i] = numbersAndLetters[random.nextInt(n)]; - } - return new String(buf); - } - - ElasticsearchClient client(String id) { - return clients.get(id); - } - } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index ae33bd9..c9a7cd7 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -44,7 +44,7 @@ class SmokeTest { client.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS); client.delete("test_smoke", "1"); client.deleteIndex("test_smoke"); - IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke", Settings.settingsBuilder() + IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke_2", Settings.settingsBuilder() .build()); assertEquals(0, indexDefinition.getReplicaLevel()); client.newIndex(indexDefinition); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index 7d28686..dcd0e31 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -19,8 +19,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; -import org.junit.jupiter.api.extension.AfterAllCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; @@ -39,7 +39,7 @@ import java.util.Random; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -public class TestExtension implements ParameterResolver, BeforeAllCallback, AfterAllCallback { +public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { private static final Logger logger = LogManager.getLogger("test"); @@ -73,17 +73,14 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + setHome(System.getProperty("path.home") + "/" + getRandomString(8)); + setClusterName("test-cluster-" + System.getProperty("user.name")); return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create()); } @Override - public void beforeAll(ExtensionContext context) throws Exception { + public void beforeEach(ExtensionContext context) throws Exception { Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); - setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); - setClusterName("test-cluster-" + System.getProperty("user.name")); - deleteFiles(Paths.get(getHome() + "/data")); - logger.info("data files wiped"); - Thread.sleep(2000L); // let OS commit changes logger.info("starting cluster"); helper.startNode("1"); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); @@ -114,9 +111,11 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte } @Override - public void afterAll(ExtensionContext context) throws Exception { + public void afterEach(ExtensionContext context) throws Exception { closeNodes(); deleteFiles(Paths.get(getHome() + "/data")); + logger.info("data files wiped"); + Thread.sleep(2000L); // let OS commit changes } private void setClusterName(String cluster) { @@ -169,6 +168,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte } } + private String getRandomString(int len) { + final char[] buf = new char[len]; + final int n = numbersAndLetters.length - 1; + for (int i = 0; i < buf.length; i++) { + buf[i] = numbersAndLetters[random.nextInt(n)]; + } + return new String(buf); + } + private Helper create() { return new Helper(); } @@ -179,6 +187,18 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte buildNode(id).start(); } + ElasticsearchClient client(String id) { + return clients.get(id); + } + + String getCluster() { + return getClusterName(); + } + + String randomString(int n) { + return getRandomString(n); + } + private Node buildNode(String id) { Settings nodeSettings = settingsBuilder() .put("cluster.name", getClusterName()) @@ -192,22 +212,5 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte logger.info("clients={}", clients); return node; } - - String randomString(int len) { - final char[] buf = new char[len]; - final int n = numbersAndLetters.length - 1; - for (int i = 0; i < buf.length; i++) { - buf[i] = numbersAndLetters[random.nextInt(n)]; - } - return new String(buf); - } - - ElasticsearchClient client(String id) { - return clients.get(id); - } - - String getCluster() { - return getClusterName(); - } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java index 54b68c5..73c0f81 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; @@ -43,7 +42,6 @@ class ClientTest { ClientTest(TestExtension.Helper helper) { this.helper = helper; - helper.startNode("2"); } @Test @@ -145,7 +143,6 @@ class ClientTest { } @Test - @Disabled void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); long maxactions = MAX_ACTIONS_PER_REQUEST; diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 8172f89..64207e7 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -25,9 +25,9 @@ class DuplicateIDTest { private final static Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); - private final static Long MAX_ACTIONS_PER_REQUEST = 10L; + private final static Long MAX_ACTIONS_PER_REQUEST = 100L; - private final static Long ACTIONS = 5L; + private final static Long ACTIONS = 50L; private final TestExtension.Helper helper; diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 1c57312..1f65064 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -33,7 +33,7 @@ class IndexShiftTest { } @Test - void testIndexAlias() throws Exception { + void testIndexShift() throws Exception { final ExtendedTransportClient client = ClientBuilder.builder() .provider(ExtendedTransportClientProvider.class) .put(helper.getTransportSettings()).build(); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 0572ef5..be4a87b 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -45,7 +45,7 @@ class SmokeTest extends TestExtension { client.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS); client.delete("test_smoke", "1"); client.deleteIndex("test_smoke"); - IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test2", Settings.settingsBuilder() + IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke_2", Settings.settingsBuilder() .build()); assertEquals(0, indexDefinition.getReplicaLevel()); client.newIndex(indexDefinition); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index 8c6fe19..e37ca8c 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -19,8 +19,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; -import org.junit.jupiter.api.extension.AfterAllCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; @@ -39,7 +39,7 @@ import java.util.Random; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -public class TestExtension implements ParameterResolver, BeforeAllCallback, AfterAllCallback { +public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { private static final Logger logger = LogManager.getLogger("test"); @@ -59,7 +59,7 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte private int port; - private static final String key = "es-instance"; + private static final String key = "es-test-instance"; private static final ExtensionContext.Namespace ns = ExtensionContext.Namespace.create(TestExtension.class); @@ -73,17 +73,14 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create()); + setHome(System.getProperty("path.home") + "/" + getRandomString(8)); + setClusterName("test-cluster-" + System.getProperty("user.name")); + return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); } @Override - public void beforeAll(ExtensionContext context) throws Exception { - Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); - setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); - setClusterName("test-cluster-" + System.getProperty("user.name")); - deleteFiles(Paths.get(getHome() + "/data")); - logger.info("data files wiped: " + getHome()); - Thread.sleep(2000L); // let OS commit changes + public void beforeEach(ExtensionContext extensionContext) throws Exception { + Helper helper = extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); logger.info("starting cluster"); helper.startNode("1"); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); @@ -114,10 +111,11 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte } @Override - public void afterAll(ExtensionContext context) throws Exception { + public void afterEach(ExtensionContext context) throws Exception { closeNodes(); deleteFiles(Paths.get(getHome() + "/data")); - logger.info("cluster stopped"); + logger.info("data files wiped: " + getHome()); + Thread.sleep(2000L); // let OS commit changes } private void setClusterName(String cluster) { @@ -170,6 +168,15 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte } } + private String getRandomString(int len) { + final char[] buf = new char[len]; + final int n = numbersAndLetters.length - 1; + for (int i = 0; i < buf.length; i++) { + buf[i] = numbersAndLetters[random.nextInt(n)]; + } + return new String(buf); + } + private Helper create() { return new Helper(); } @@ -196,6 +203,18 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte buildNode(id).start(); } + ElasticsearchClient client(String id) { + return clients.get(id); + } + + String getCluster() { + return getClusterName(); + } + + String randomString(int n) { + return getRandomString(n); + } + private Node buildNode(String id) { Settings nodeSettings = settingsBuilder() .put(getNodeSettings()) @@ -208,22 +227,5 @@ public class TestExtension implements ParameterResolver, BeforeAllCallback, Afte logger.info("clients={}", clients); return node; } - - String randomString(int len) { - final char[] buf = new char[len]; - final int n = numbersAndLetters.length - 1; - for (int i = 0; i < buf.length; i++) { - buf[i] = numbersAndLetters[random.nextInt(n)]; - } - return new String(buf); - } - - ElasticsearchClient client(String id) { - return clients.get(id); - } - - String getCluster() { - return getClusterName(); - } } }