re-add type parameter, less API methods

This commit is contained in:
Jörg Prante 2021-04-12 16:02:21 +02:00
parent 83b0c51ac9
commit b9c8cb7b9a
21 changed files with 249 additions and 203 deletions

View file

@ -1,3 +0,0 @@
language: java
jdk:
- openjdk11

View file

@ -4,11 +4,8 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.Flushable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public interface BulkClient extends BasicClient, Flushable {
@ -19,14 +16,6 @@ public interface BulkClient extends BasicClient, Flushable {
*/
BulkController getBulkController();
/**
* Create a new index.
*
* @param index index
* @throws IOException if new index creation fails
*/
void newIndex(String index) throws IOException;
/**
* Create a new index.
* @param indexDefinition the index definition
@ -34,58 +23,31 @@ public interface BulkClient extends BasicClient, Flushable {
*/
void newIndex(IndexDefinition indexDefinition) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param settings settings
* @throws IOException if settings is invalid or index creation fails
*/
void newIndex(String index, Settings settings) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param settings settings
* @param mapping mapping
* @throws IOException if settings/mapping is invalid or index creation fails
*/
void newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param settings settings
* @param mapping mapping
* @throws IOException if settings/mapping is invalid or index creation fails
*/
void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException;
/**
* Add index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded.
*
* @param index the index
* @param type the type
* @param id the id
* @param create true if document must be created
* @param source the source
* @return this
*/
BulkClient index(String index, String id, boolean create, BytesReference source);
BulkClient index(String index, String type, String id, boolean create, BytesReference source);
/**
* Index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded.
*
* @param index the index
* @param type the type
* @param id the id
* @param create true if document is to be created, false otherwise
* @param source the source
* @return this client methods
*/
BulkClient index(String index, String id, boolean create, String source);
BulkClient index(String index, String type, String id, boolean create, String source);
/**
* Index request. Each request will be added to a queue for bulking requests.
@ -100,10 +62,11 @@ public interface BulkClient extends BasicClient, Flushable {
* Delete request.
*
* @param index the index
* @param type the type
* @param id the id
* @return this
*/
BulkClient delete(String index, String id);
BulkClient delete(String index, String type, String id);
/**
* Delete request. Each request will be added to a queue for bulking requests.
@ -120,21 +83,23 @@ public interface BulkClient extends BasicClient, Flushable {
* Note that updates only work correctly when all operations between nodes are synchronized.
*
* @param index the index
* @param type the type
* @param id the id
* @param source the source
* @return this
*/
BulkClient update(String index, String id, BytesReference source);
BulkClient update(String index, String type, String id, BytesReference source);
/**
* Update document. Use with precaution! Does not work in all cases.
*
* @param index the index
* @param type the type
* @param id the id
* @param source the source
* @return this
*/
BulkClient update(String index, String id, String source);
BulkClient update(String index, String type, String id, String source);
/**
* Bulked update request. Each request will be added to a queue for bulking requests.

View file

@ -10,6 +10,10 @@ public interface IndexDefinition {
String getIndex();
IndexDefinition setType(String type);
String getType();
IndexDefinition setFullIndexName(String fullIndexName);
String getFullIndexName();

View file

@ -1,4 +0,0 @@
/**
* The API of the extended Elasticsearch clients.
*/
package org.xbib.elx.api;

View file

@ -16,7 +16,6 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.xbib.elx.api.BulkClient;
import org.xbib.elx.api.BulkController;
@ -71,51 +70,31 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override
public void newIndex(IndexDefinition indexDefinition) throws IOException {
Settings settings = indexDefinition.getSettings() == null ? null :
Settings.builder().loadFromSource(indexDefinition.getSettings()).build();
Map<String, ?> mappings = indexDefinition.getMappings() == null ? null :
JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered();
newIndex(indexDefinition.getFullIndexName(), settings, mappings);
}
@Override
public void newIndex(String index) throws IOException {
newIndex(index, Settings.EMPTY, (XContentBuilder) null);
}
@Override
public void newIndex(String index, Settings settings) throws IOException {
newIndex(index, settings, (XContentBuilder) null);
}
@Override
public void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
if (mapping == null || mapping.isEmpty()) {
newIndex(index, settings, (XContentBuilder) null);
} else {
newIndex(index, settings, JsonXContent.contentBuilder().map(mapping));
}
}
@Override
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
String index = indexDefinition.getFullIndexName();
if (index == null) {
logger.warn("unable to create index, no index name given");
return;
throw new IllegalArgumentException("no index name given");
}
String type = indexDefinition.getType();
if (type == null) {
throw new IllegalArgumentException("no index type given");
}
ensureClientIsPresent();
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
.setIndex(index);
Settings settings = indexDefinition.getSettings() == null ? null :
Settings.builder().loadFromSource(indexDefinition.getSettings()).build();
if (settings != null) {
createIndexRequestBuilder.setSettings(settings);
}
if (builder != null) {
createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
logger.debug("adding mapping = {}", builder.string());
// must be Map<String, Object> to match prototype of addMapping()!
Map<String, Object> mappings = indexDefinition.getMappings() == null ? null :
JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered();
if (mappings != null) {
logger.info("mappings = " + mappings);
createIndexRequestBuilder.addMapping(type, mappings);
} else {
createIndexRequestBuilder.addMapping(TYPE_NAME,
JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject());
logger.debug("empty mapping");
createIndexRequestBuilder.addMapping(type,
JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject());
}
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
if (createIndexResponse.isAcknowledged()) {
@ -151,20 +130,20 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
}
@Override
public BulkClient index(String index, String id, boolean create, String source) {
public BulkClient index(String index, String type, String id, boolean create, String source) {
return index(new IndexRequest()
.index(index)
.type(TYPE_NAME)
.type(type)
.id(id)
.create(create)
.source(source)); // will be converted into a bytes reference
}
@Override
public BulkClient index(String index, String id, boolean create, BytesReference source) {
public BulkClient index(String index, String type, String id, boolean create, BytesReference source) {
return index(new IndexRequest()
.index(index)
.type(TYPE_NAME)
.type(type)
.id(id)
.create(create)
.source(source));
@ -178,10 +157,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
}
@Override
public BulkClient delete(String index, String id) {
public BulkClient delete(String index, String type, String id) {
return delete(new DeleteRequest()
.index(index)
.type(TYPE_NAME)
.type(type)
.id(id));
}
@ -193,15 +172,15 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
}
@Override
public BulkClient update(String index, String id, String source) {
return update(index, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
public BulkClient update(String index, String type, String id, String source) {
return update(index, type, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
}
@Override
public BulkClient update(String index, String id, BytesReference source) {
public BulkClient update(String index, String type, String id, BytesReference source) {
return update(new UpdateRequest()
.index(index)
.type(TYPE_NAME)
.type(type)
.id(id)
.doc(source.hasArray() ? source.array() : source.toBytes()));
}

View file

@ -126,8 +126,8 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
SearchResponse originalSearchResponse = searchRequestBuilder.execute().actionGet();
Stream<SearchResponse> responseStream = Stream.iterate(originalSearchResponse,
SearchResponse initialSearchResponse = searchRequestBuilder.execute().actionGet();
Stream<SearchResponse> responseStream = Stream.iterate(initialSearchResponse,
searchResponse -> {
SearchScrollRequestBuilder searchScrollRequestBuilder =
new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)

View file

@ -12,6 +12,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
private String index;
private String type;
private String fullIndexName;
private DateTimeFormatter formatter;
@ -60,6 +62,18 @@ public class DefaultIndexDefinition implements IndexDefinition {
return index;
}
@Override
public IndexDefinition setType(String type) {
this.type = type;
return this;
}
@Override
public String getType() {
return type;
}
@Override
public IndexDefinition setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName;

View file

@ -40,17 +40,17 @@ public class MockBulkClient extends AbstractBulkClient {
}
@Override
public MockBulkClient index(String index, String id, boolean create, String source) {
public MockBulkClient index(String index, String type, String id, boolean create, String source) {
return this;
}
@Override
public MockBulkClient delete(String index, String id) {
public MockBulkClient delete(String index, String type, String id) {
return this;
}
@Override
public MockBulkClient update(String index, String id, String source) {
public MockBulkClient update(String index, String type, String id, String source) {
return this;
}

View file

@ -2,13 +2,14 @@ package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.NodeAdminClient;
import org.xbib.elx.node.NodeAdminClientProvider;
@ -47,8 +48,11 @@ class BulkClientTest {
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30))
.build()) {
bulkClient.newIndex("test");
bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
bulkClient.index("test", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
@ -66,7 +70,10 @@ class BulkClientTest {
.put(helper.getNodeSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build()) {
bulkClient.newIndex("test");
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
}
}
@ -90,7 +97,12 @@ class BulkClientTest {
.endObject()
.endObject()
.endObject();
bulkClient.newIndex("test", Settings.EMPTY, builder);
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test");
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
indexDefinition.setMappings(builder.string());
bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping("test").containsKey("properties"));
}
}
@ -104,9 +116,12 @@ class BulkClientTest {
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build()) {
bulkClient.newIndex("test");
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < ACTIONS; i++) {
bulkClient.index("test", null, false,
bulkClient.index("test", "doc", null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();
@ -134,18 +149,25 @@ class BulkClientTest {
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build()) {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
bulkClient.newIndex("test", settings);
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("index")
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject();
indexDefinition.setSettings(builder.string());
bulkClient.newIndex(indexDefinition);
bulkClient.startBulk("test", 0, 1000);
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
final CountDownLatch latch = new CountDownLatch(maxthreads);
for (int i = 0; i < maxthreads; i++) {
executorService.execute(() -> {
for (int i1 = 0; i1 < actions; i1++) {
bulkClient.index("test", null, false,
bulkClient.index("test", "doc", null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
latch.countDown();

View file

@ -4,7 +4,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider;
@ -38,9 +40,12 @@ class DuplicateIDTest {
.put(helper.getNodeSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build()) {
bulkClient.newIndex("test");
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < ACTIONS; i++) {
bulkClient.index("test", helper.randomString(1), false,
bulkClient.index("test", "doc", helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();

View file

@ -2,7 +2,8 @@ package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
@ -46,26 +47,31 @@ class IndexPruneTest {
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.build()) {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
bulkClient.newIndex("test_prune1", settings);
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("index")
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject();
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test_prune");
indexDefinition.setType("doc");
indexDefinition.setFullIndexName("test_prune1");
indexDefinition.setSettings(builder.string());
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune2", settings);
indexDefinition.setFullIndexName("test_prune2");
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune3", settings);
indexDefinition.setFullIndexName("test_prune3");
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune4", settings);
indexDefinition.setFullIndexName("test_prune4");
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
IndexRetention indexRetention = new DefaultIndexRetention();

View file

@ -4,7 +4,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -45,20 +46,25 @@ class IndexShiftTest {
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.build()) {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
bulkClient.newIndex("test_shift", settings);
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("index")
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject();
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test");
indexDefinition.setType("doc");
indexDefinition.setFullIndexName("test_shift");
indexDefinition.setSettings(builder.string());
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) {
bulkClient.index("test_shift", helper.randomString(1), false,
bulkClient.index("test_shift", "doc", helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test");
indexDefinition.setFullIndexName("test_shift");
indexDefinition.setShift(true);
IndexShiftResult indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"));
assertTrue(indexShiftResult.getNewAliases().contains("a"));
@ -76,9 +82,10 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
bulkClient.newIndex("test_shift2", settings);
indexDefinition.setFullIndexName("test_shift2");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) {
bulkClient.index("test_shift2", helper.randomString(1), false,
bulkClient.index("test_shift2", "doc", helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();

View file

@ -9,7 +9,9 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider;
@ -24,7 +26,7 @@ class SearchTest {
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
private static final Long ACTIONS = 100000L;
private static final Long ACTIONS = 1000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
@ -42,9 +44,12 @@ class SearchTest {
.put(helper.getNodeSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build()) {
bulkClient.newIndex("test");
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < numactions; i++) {
bulkClient.index("test", null, false,
bulkClient.index("test", "doc", null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();

View file

@ -42,9 +42,11 @@ class SmokeTest {
adminClient.buildIndexDefinitionFromSettings("test_smoke_definition", Settings.EMPTY);
assertEquals(0, indexDefinition.getReplicaLevel());
assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex("test_smoke");
indexDefinition.setFullIndexName("test_smoke");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
logger.info("new index: done");
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
logger.info("index doc: done");
bulkClient.flush();
logger.info("flush: done");
@ -52,18 +54,18 @@ class SmokeTest {
logger.info("wait: done");
adminClient.checkMapping("test_smoke");
logger.info("check mapping: done");
bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete("test_smoke", "1");
bulkClient.update("test_smoke", "doc", "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete("test_smoke", "doc", "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete("test_smoke", "1");
bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete("test_smoke", "doc", "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.deleteIndex("test_smoke");
logger.info("delete index: done");
bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.index(indexDefinition.getFullIndexName(), "doc", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.updateReplicaLevel(indexDefinition, 2);

View file

@ -2,13 +2,14 @@ package org.xbib.elx.transport.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.transport.TransportAdminClient;
import org.xbib.elx.transport.TransportAdminClientProvider;
@ -47,8 +48,11 @@ class BulkClientTest {
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30))
.build()) {
bulkClient.newIndex("test");
bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
bulkClient.index("test", "docd", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
@ -65,7 +69,10 @@ class BulkClientTest {
.setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings())
.build()) {
bulkClient.newIndex("test");
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
}
}
@ -89,7 +96,11 @@ class BulkClientTest {
.endObject()
.endObject()
.endObject();
bulkClient.newIndex("test", Settings.EMPTY, builder);
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
indexDefinition.setMappings(builder.string());
bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping("test").containsKey("properties"));
}
}
@ -103,9 +114,12 @@ class BulkClientTest {
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build()) {
bulkClient.newIndex("test");
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < ACTIONS; i++) {
bulkClient.index("test", null, false,
bulkClient.index("test", "docs", null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();
@ -134,11 +148,19 @@ class BulkClientTest {
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.put(Parameters.ENABLE_BULK_LOGGING.name(), "true")
.build()) {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
bulkClient.newIndex("test", settings);
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("index")
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject();
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test");
indexDefinition.setType("doc");
indexDefinition.setFullIndexName("test");
indexDefinition.setSettings(builder.string());
bulkClient.newIndex(indexDefinition);
bulkClient.startBulk("test", 0, 1000);
logger.info("index created");
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
@ -146,7 +168,7 @@ class BulkClientTest {
for (int i = 0; i < maxthreads; i++) {
executorService.execute(() -> {
for (int i1 = 0; i1 < actions; i1++) {
bulkClient.index("test", null, false,
bulkClient.index("test", "docs", null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
latch.countDown();

View file

@ -4,7 +4,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider;
@ -36,9 +38,12 @@ class DuplicateIDTest {
.put(helper.getTransportSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build()) {
bulkClient.newIndex("test");
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < ACTIONS; i++) {
bulkClient.index("test", helper.randomString(1), false,
bulkClient.index("test", "docs", helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();

View file

@ -2,7 +2,8 @@ package org.xbib.elx.transport.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
@ -17,13 +18,10 @@ import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider;
import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -50,26 +48,31 @@ class IndexPruneTest {
.setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings())
.build()) {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
bulkClient.newIndex("test_prune1", settings);
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("index")
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject();
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test_prune");
indexDefinition.setType("doc");
indexDefinition.setFullIndexName("test_prune1");
indexDefinition.setSettings(builder.string());
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune2", settings);
indexDefinition.setFullIndexName("test_prune2");
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune3", settings);
indexDefinition.setFullIndexName("test_prune3");
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune4", settings);
indexDefinition.setFullIndexName("test_prune4");
bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
IndexRetention indexRetention = new DefaultIndexRetention();

View file

@ -4,7 +4,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -47,20 +48,25 @@ class IndexShiftTest {
.setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings())
.build()) {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
bulkClient.newIndex("test_shift", settings);
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("index")
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject();
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test");
indexDefinition.setType("doc");
indexDefinition.setFullIndexName("test_shift");
indexDefinition.setSettings(builder.string());
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) {
bulkClient.index("test_shift", helper.randomString(1), false,
bulkClient.index("test_shift", "doc", helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test");
indexDefinition.setFullIndexName("test_shift");
indexDefinition.setShift(true);
IndexShiftResult indexShiftResult =
adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"));
@ -79,9 +85,10 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
bulkClient.newIndex("test_shift2", settings);
indexDefinition.setFullIndexName("test_shift2");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) {
bulkClient.index("test_shift2", helper.randomString(1), false,
bulkClient.index("test_shift2", "doc", helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();

View file

@ -7,7 +7,9 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider;
@ -26,7 +28,7 @@ class SearchTest {
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
private static final Long ACTIONS = 100000L;
private static final Long ACTIONS = 1000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
@ -44,9 +46,12 @@ class SearchTest {
.put(helper.getTransportSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build()) {
bulkClient.newIndex("test");
for (int i = 0; i < ACTIONS; i++) {
bulkClient.index("test", null, false,
IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setFullIndexName("test");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < numactions; i++) {
bulkClient.index("test", "doc", null, false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.flush();

View file

@ -42,22 +42,24 @@ class SmokeTest {
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
assertEquals(0, indexDefinition.getReplicaLevel());
assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex("test_smoke");
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
indexDefinition.setFullIndexName("test_smoke");
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.checkMapping("test_smoke");
bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete("test_smoke", "1");
bulkClient.update("test_smoke", "doc", "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete("test_smoke", "doc", "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete("test_smoke", "1");
bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete("test_smoke", "doc", "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.deleteIndex("test_smoke");
bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.index(indexDefinition.getFullIndexName(), "doc", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.updateReplicaLevel(indexDefinition, 2);

View file

@ -1,14 +1,14 @@
group = org.xbib
name = elx
version = 2.2.1.29
version = 2.2.1.30
gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0
xbib-guice.version = 4.4.2
xbib-guava.version = 28.1
xbib-netty-http.version = 4.1.54.1
xbib-netty-http.version = 4.1.60.1
elasticsearch.version = 2.2.1
jackson.version = 2.11.3
jackson.version = 2.11.4
jna.version = 5.5.0
log4j.version = 2.13.3
mustache.version = 0.9.5