|
|
|
@ -1,5 +1,6 @@
|
|
|
|
|
package org.xbib.elx.common;
|
|
|
|
|
|
|
|
|
|
import org.apache.logging.log4j.Level;
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
|
|
|
@ -46,7 +47,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void init(Settings settings) throws IOException {
|
|
|
|
|
public void init(Settings settings) {
|
|
|
|
|
if (closed.compareAndSet(true, false)) {
|
|
|
|
|
super.init(settings);
|
|
|
|
|
bulkProcessor = new DefaultBulkProcessor(this, settings);
|
|
|
|
@ -79,7 +80,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void newIndex(IndexDefinition indexDefinition) throws IOException {
|
|
|
|
|
public void newIndex(IndexDefinition indexDefinition) {
|
|
|
|
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -91,24 +92,32 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|
|
|
|
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
|
|
|
|
|
.setIndex(index);
|
|
|
|
|
if (indexDefinition.getSettings() == null) {
|
|
|
|
|
XContentBuilder builder = JsonXContent.contentBuilder()
|
|
|
|
|
.startObject()
|
|
|
|
|
.startObject("index")
|
|
|
|
|
.field("number_of_shards", 1)
|
|
|
|
|
.field("number_of_replicas", 0)
|
|
|
|
|
.endObject()
|
|
|
|
|
.endObject();
|
|
|
|
|
indexDefinition.setSettings(Strings.toString(builder));
|
|
|
|
|
try {
|
|
|
|
|
XContentBuilder builder = JsonXContent.contentBuilder()
|
|
|
|
|
.startObject()
|
|
|
|
|
.startObject("index")
|
|
|
|
|
.field("number_of_shards", 1)
|
|
|
|
|
.field("number_of_replicas", 0)
|
|
|
|
|
.endObject()
|
|
|
|
|
.endObject();
|
|
|
|
|
indexDefinition.setSettings(Strings.toString(builder));
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
logger.log(Level.WARN, e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build();
|
|
|
|
|
createIndexRequestBuilder.setSettings(settings);
|
|
|
|
|
if (indexDefinition.getMappings() != null) {
|
|
|
|
|
Map<String, Object> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
|
|
|
|
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered();
|
|
|
|
|
createIndexRequestBuilder.addMapping(TYPE_NAME, mappings);
|
|
|
|
|
} else {
|
|
|
|
|
XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject();
|
|
|
|
|
createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
|
|
|
|
|
try {
|
|
|
|
|
if (indexDefinition.getMappings() != null) {
|
|
|
|
|
Map<String, Object> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
|
|
|
|
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered();
|
|
|
|
|
createIndexRequestBuilder.addMapping(TYPE_NAME, mappings);
|
|
|
|
|
} else {
|
|
|
|
|
XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject();
|
|
|
|
|
createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
|
|
|
|
|
}
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
logger.log(Level.WARN, e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
|
|
|
|
|
if (createIndexResponse.isAcknowledged()) {
|
|
|
|
@ -118,11 +127,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly.
|
|
|
|
|
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
|
|
|
|
waitForHealthyCluster();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void startBulk(IndexDefinition indexDefinition) throws IOException {
|
|
|
|
|
public void startBulk(IndexDefinition indexDefinition) {
|
|
|
|
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -133,7 +142,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void stopBulk(IndexDefinition indexDefinition) throws IOException {
|
|
|
|
|
public void stopBulk(IndexDefinition indexDefinition) {
|
|
|
|
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -216,7 +225,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
|
|
|
|
|
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) {
|
|
|
|
|
super.updateIndexSetting(index, key, value, timeout, timeUnit);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|