summaryrefslogtreecommitdiff
path: root/src/main/java/org/elasticsearch/index/translog/TranslogService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/elasticsearch/index/translog/TranslogService.java')
-rw-r--r--src/main/java/org/elasticsearch/index/translog/TranslogService.java211
1 files changed, 211 insertions, 0 deletions
diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/src/main/java/org/elasticsearch/index/translog/TranslogService.java
new file mode 100644
index 0000000..14a5e49
--- /dev/null
+++ b/src/main/java/org/elasticsearch/index/translog/TranslogService.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import jsr166y.ThreadLocalRandom;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
+import org.elasticsearch.index.settings.IndexSettings;
+import org.elasticsearch.index.settings.IndexSettingsService;
+import org.elasticsearch.index.shard.AbstractIndexShardComponent;
+import org.elasticsearch.index.shard.IllegalIndexShardStateException;
+import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.service.IndexShard;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.concurrent.ScheduledFuture;
+
+import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
+
+/**
+ *
+ */
+public class TranslogService extends AbstractIndexShardComponent {
+
+ private final ThreadPool threadPool;
+ private final IndexSettingsService indexSettingsService;
+ private final IndexShard indexShard;
+ private final Translog translog;
+
+ private volatile TimeValue interval;
+ private volatile int flushThresholdOperations;
+ private volatile ByteSizeValue flushThresholdSize;
+ private volatile TimeValue flushThresholdPeriod;
+ private volatile boolean disableFlush;
+ private volatile ScheduledFuture future;
+
+ private final ApplySettings applySettings = new ApplySettings();
+
+ @Inject
+ public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, ThreadPool threadPool, IndexShard indexShard, Translog translog) {
+ super(shardId, indexSettings);
+ this.threadPool = threadPool;
+ this.indexSettingsService = indexSettingsService;
+ this.indexShard = indexShard;
+ this.translog = translog;
+
+ this.flushThresholdOperations = componentSettings.getAsInt("flush_threshold_ops", componentSettings.getAsInt("flush_threshold", 5000));
+ this.flushThresholdSize = componentSettings.getAsBytesSize("flush_threshold_size", new ByteSizeValue(200, ByteSizeUnit.MB));
+ this.flushThresholdPeriod = componentSettings.getAsTime("flush_threshold_period", TimeValue.timeValueMinutes(30));
+ this.interval = componentSettings.getAsTime("interval", timeValueMillis(5000));
+ this.disableFlush = componentSettings.getAsBoolean("disable_flush", false);
+
+ logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
+
+ this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush());
+
+ indexSettingsService.addListener(applySettings);
+ }
+
+
+ public void close() {
+ indexSettingsService.removeListener(applySettings);
+ this.future.cancel(true);
+ }
+
+ public static final String INDEX_TRANSLOG_FLUSH_INTERVAL = "index.translog.interval";
+ public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
+ public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
+ public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD = "index.translog.flush_threshold_period";
+ public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
+
+ class ApplySettings implements IndexSettingsService.Listener {
+ @Override
+ public void onRefreshSettings(Settings settings) {
+ int flushThresholdOperations = settings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, TranslogService.this.flushThresholdOperations);
+ if (flushThresholdOperations != TranslogService.this.flushThresholdOperations) {
+ logger.info("updating flush_threshold_ops from [{}] to [{}]", TranslogService.this.flushThresholdOperations, flushThresholdOperations);
+ TranslogService.this.flushThresholdOperations = flushThresholdOperations;
+ }
+ ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, TranslogService.this.flushThresholdSize);
+ if (!flushThresholdSize.equals(TranslogService.this.flushThresholdSize)) {
+ logger.info("updating flush_threshold_size from [{}] to [{}]", TranslogService.this.flushThresholdSize, flushThresholdSize);
+ TranslogService.this.flushThresholdSize = flushThresholdSize;
+ }
+ TimeValue flushThresholdPeriod = settings.getAsTime(INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TranslogService.this.flushThresholdPeriod);
+ if (!flushThresholdPeriod.equals(TranslogService.this.flushThresholdPeriod)) {
+ logger.info("updating flush_threshold_period from [{}] to [{}]", TranslogService.this.flushThresholdPeriod, flushThresholdPeriod);
+ TranslogService.this.flushThresholdPeriod = flushThresholdPeriod;
+ }
+ TimeValue interval = settings.getAsTime(INDEX_TRANSLOG_FLUSH_INTERVAL, TranslogService.this.interval);
+ if (!interval.equals(TranslogService.this.interval)) {
+ logger.info("updating interval from [{}] to [{}]", TranslogService.this.interval, interval);
+ TranslogService.this.interval = interval;
+ }
+ boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, TranslogService.this.disableFlush);
+ if (disableFlush != TranslogService.this.disableFlush) {
+ logger.info("updating disable_flush from [{}] to [{}]", TranslogService.this.disableFlush, disableFlush);
+ TranslogService.this.disableFlush = disableFlush;
+ }
+ }
+ }
+
+ private TimeValue computeNextInterval() {
+ return new TimeValue(interval.millis() + (ThreadLocalRandom.current().nextLong(interval.millis())));
+ }
+
+ private class TranslogBasedFlush implements Runnable {
+
+ private volatile long lastFlushTime = System.currentTimeMillis();
+
+ @Override
+ public void run() {
+ if (indexShard.state() == IndexShardState.CLOSED) {
+ return;
+ }
+
+ // flush is disabled, but still reschedule
+ if (disableFlush) {
+ reschedule();
+ return;
+ }
+
+ if (indexShard.state() == IndexShardState.CREATED) {
+ reschedule();
+ return;
+ }
+
+ int currentNumberOfOperations = translog.estimatedNumberOfOperations();
+ if (currentNumberOfOperations == 0) {
+ reschedule();
+ return;
+ }
+
+ if (flushThresholdOperations > 0) {
+ if (currentNumberOfOperations > flushThresholdOperations) {
+ logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations);
+ asyncFlushAndReschedule();
+ return;
+ }
+ }
+
+ if (flushThresholdSize.bytes() > 0) {
+ long sizeInBytes = translog.translogSizeInBytes();
+ if (sizeInBytes > flushThresholdSize.bytes()) {
+ logger.trace("flushing translog, size [{}], breached [{}]", new ByteSizeValue(sizeInBytes), flushThresholdSize);
+ asyncFlushAndReschedule();
+ return;
+ }
+ }
+
+ if (flushThresholdPeriod.millis() > 0) {
+ if ((threadPool.estimatedTimeInMillis() - lastFlushTime) > flushThresholdPeriod.millis()) {
+ logger.trace("flushing translog, last_flush_time [{}], breached [{}]", lastFlushTime, flushThresholdPeriod);
+ asyncFlushAndReschedule();
+ return;
+ }
+ }
+
+ reschedule();
+ }
+
+ private void reschedule() {
+ future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, this);
+ }
+
+ private void asyncFlushAndReschedule() {
+ threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ indexShard.flush(new Engine.Flush());
+ } catch (IllegalIndexShardStateException e) {
+ // we are being closed, or in created state, ignore
+ } catch (FlushNotAllowedEngineException e) {
+ // ignore this exception, we are not allowed to perform flush
+ } catch (Throwable e) {
+ logger.warn("failed to flush shard on translog threshold", e);
+ }
+ lastFlushTime = threadPool.estimatedTimeInMillis();
+
+ if (indexShard.state() != IndexShardState.CLOSED) {
+ future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, TranslogBasedFlush.this);
+ }
+ }
+ });
+ }
+ }
+}