summaryrefslogtreecommitdiff
path: root/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java')
-rw-r--r--src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java658
1 files changed, 658 insertions, 0 deletions
diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
new file mode 100644
index 0000000..4789d6b
--- /dev/null
+++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
@@ -0,0 +1,658 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster;
+
+import com.google.common.base.Predicate;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.PendingClusterTask;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.component.LifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.inject.Singleton;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.plugins.AbstractPlugin;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.hamcrest.Matchers.*;
+
+/**
+ *
+ */
+@ClusterScope(scope = Scope.TEST, numNodes = 0)
+public class ClusterServiceTests extends ElasticsearchIntegrationTest {
+
+ @Test
+ public void testTimeoutUpdateTask() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("discovery.type", "local")
+ .build();
+ cluster().startNode(settings);
+ ClusterService clusterService1 = cluster().getInstance(ClusterService.class);
+ final CountDownLatch block = new CountDownLatch(1);
+ clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ try {
+ block.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ fail();
+ }
+ });
+
+ final CountDownLatch timedOut = new CountDownLatch(1);
+ final AtomicBoolean executeCalled = new AtomicBoolean();
+ clusterService1.submitStateUpdateTask("test2", new TimeoutClusterStateUpdateTask() {
+ @Override
+ public TimeValue timeout() {
+ return TimeValue.timeValueMillis(2);
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ timedOut.countDown();
+ }
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ executeCalled.set(true);
+ return currentState;
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ }
+ });
+
+ assertThat(timedOut.await(500, TimeUnit.MILLISECONDS), equalTo(true));
+ block.countDown();
+ Thread.sleep(100); // sleep a bit to double check that execute on the timed out update task is not called...
+ assertThat(executeCalled.get(), equalTo(false));
+ }
+
+ @Test
+ public void testAckedUpdateTask() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("discovery.type", "local")
+ .build();
+ cluster().startNode(settings);
+ ClusterService clusterService = cluster().getInstance(ClusterService.class);
+
+ final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
+ final AtomicBoolean ackTimeout = new AtomicBoolean(false);
+ final AtomicBoolean onFailure = new AtomicBoolean(false);
+ final AtomicBoolean executed = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch processedLatch = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
+ @Override
+ public boolean mustAck(DiscoveryNode discoveryNode) {
+ return true;
+ }
+
+ @Override
+ public void onAllNodesAcked(@Nullable Throwable t) {
+ allNodesAcked.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void onAckTimeout() {
+ ackTimeout.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public TimeValue ackTimeout() {
+ return TimeValue.timeValueSeconds(10);
+ }
+
+ @Override
+ public TimeValue timeout() {
+ return TimeValue.timeValueSeconds(10);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ processedLatch.countDown();
+ }
+
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ executed.set(true);
+ return ClusterState.builder(currentState).build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("failed to execute callback in test {}", t, source);
+ onFailure.set(true);
+ latch.countDown();
+ }
+ });
+
+ assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
+
+ assertThat(allNodesAcked.get(), equalTo(true));
+ assertThat(ackTimeout.get(), equalTo(false));
+ assertThat(executed.get(), equalTo(true));
+ assertThat(onFailure.get(), equalTo(false));
+
+ assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
+ }
+
+ @Test
+ public void testAckedUpdateTaskSameClusterState() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("discovery.type", "local")
+ .build();
+ cluster().startNode(settings);
+ ClusterService clusterService = cluster().getInstance(ClusterService.class);
+
+ final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
+ final AtomicBoolean ackTimeout = new AtomicBoolean(false);
+ final AtomicBoolean onFailure = new AtomicBoolean(false);
+ final AtomicBoolean executed = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch processedLatch = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
+ @Override
+ public boolean mustAck(DiscoveryNode discoveryNode) {
+ return true;
+ }
+
+ @Override
+ public void onAllNodesAcked(@Nullable Throwable t) {
+ allNodesAcked.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void onAckTimeout() {
+ ackTimeout.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public TimeValue ackTimeout() {
+ return TimeValue.timeValueSeconds(10);
+ }
+
+ @Override
+ public TimeValue timeout() {
+ return TimeValue.timeValueSeconds(10);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ processedLatch.countDown();
+ }
+
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ executed.set(true);
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("failed to execute callback in test {}", t, source);
+ onFailure.set(true);
+ latch.countDown();
+ }
+ });
+
+ assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
+
+ assertThat(allNodesAcked.get(), equalTo(true));
+ assertThat(ackTimeout.get(), equalTo(false));
+ assertThat(executed.get(), equalTo(true));
+ assertThat(onFailure.get(), equalTo(false));
+
+ assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
+ }
+
+ @Test
+ public void testAckedUpdateTaskNoAckExpected() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("discovery.type", "local")
+ .build();
+ cluster().startNode(settings);
+ ClusterService clusterService = cluster().getInstance(ClusterService.class);
+
+ final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
+ final AtomicBoolean ackTimeout = new AtomicBoolean(false);
+ final AtomicBoolean onFailure = new AtomicBoolean(false);
+ final AtomicBoolean executed = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
+ @Override
+ public boolean mustAck(DiscoveryNode discoveryNode) {
+ return false;
+ }
+
+ @Override
+ public void onAllNodesAcked(@Nullable Throwable t) {
+ allNodesAcked.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void onAckTimeout() {
+ ackTimeout.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public TimeValue ackTimeout() {
+ return TimeValue.timeValueSeconds(10);
+ }
+
+ @Override
+ public TimeValue timeout() {
+ return TimeValue.timeValueSeconds(10);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ }
+
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ executed.set(true);
+ return ClusterState.builder(currentState).build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("failed to execute callback in test {}", t, source);
+ onFailure.set(true);
+ latch.countDown();
+ }
+ });
+
+ assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
+
+ assertThat(allNodesAcked.get(), equalTo(true));
+ assertThat(ackTimeout.get(), equalTo(false));
+ assertThat(executed.get(), equalTo(true));
+ assertThat(onFailure.get(), equalTo(false));
+ }
+
+ @Test
+ public void testAckedUpdateTaskTimeoutZero() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("discovery.type", "local")
+ .build();
+ cluster().startNode(settings);
+ ClusterService clusterService = cluster().getInstance(ClusterService.class);
+
+ final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
+ final AtomicBoolean ackTimeout = new AtomicBoolean(false);
+ final AtomicBoolean onFailure = new AtomicBoolean(false);
+ final AtomicBoolean executed = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch processedLatch = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
+ @Override
+ public boolean mustAck(DiscoveryNode discoveryNode) {
+ return false;
+ }
+
+ @Override
+ public void onAllNodesAcked(@Nullable Throwable t) {
+ allNodesAcked.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void onAckTimeout() {
+ ackTimeout.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public TimeValue ackTimeout() {
+ return TimeValue.timeValueSeconds(0);
+ }
+
+ @Override
+ public TimeValue timeout() {
+ return TimeValue.timeValueSeconds(10);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ processedLatch.countDown();
+ }
+
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ executed.set(true);
+ return ClusterState.builder(currentState).build();
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.error("failed to execute callback in test {}", t, source);
+ onFailure.set(true);
+ latch.countDown();
+ }
+ });
+
+ assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
+
+ assertThat(allNodesAcked.get(), equalTo(false));
+ assertThat(ackTimeout.get(), equalTo(true));
+ assertThat(executed.get(), equalTo(true));
+ assertThat(onFailure.get(), equalTo(false));
+
+ assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
+ }
+
+ @Test
+ public void testPendingUpdateTask() throws Exception {
+ Settings zenSettings = settingsBuilder()
+ .put("discovery.type", "zen").build();
+ String node_0 = cluster().startNode(zenSettings);
+ cluster().startNodeClient(zenSettings);
+
+
+ ClusterService clusterService = cluster().getInstance(ClusterService.class, node_0);
+ final CountDownLatch block1 = new CountDownLatch(1);
+ final CountDownLatch invoked1 = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ invoked1.countDown();
+ try {
+ block1.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ invoked1.countDown();
+ fail();
+ }
+ });
+ invoked1.await();
+ final CountDownLatch invoked2 = new CountDownLatch(9);
+ for (int i = 2; i <= 10; i++) {
+ clusterService.submitStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ invoked2.countDown();
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ fail();
+ }
+ });
+ }
+
+ // The tasks can be re-ordered, so we need to check out-of-order
+ Set<String> controlSources = new HashSet<String>(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10"));
+ List<PendingClusterTask> pendingClusterTasks = clusterService.pendingTasks();
+ assertThat(pendingClusterTasks.size(), equalTo(9));
+ for (PendingClusterTask task : pendingClusterTasks) {
+ assertTrue(controlSources.remove(task.source().string()));
+ }
+ assertTrue(controlSources.isEmpty());
+
+ controlSources = new HashSet<String>(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10"));
+ PendingClusterTasksResponse response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
+ assertThat(response.pendingTasks().size(), equalTo(9));
+ for (PendingClusterTask task : response) {
+ assertTrue(controlSources.remove(task.source().string()));
+ }
+ assertTrue(controlSources.isEmpty());
+ block1.countDown();
+ invoked2.await();
+
+ pendingClusterTasks = clusterService.pendingTasks();
+ assertThat(pendingClusterTasks, empty());
+ response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
+ assertThat(response.pendingTasks(), empty());
+
+ final CountDownLatch block2 = new CountDownLatch(1);
+ final CountDownLatch invoked3 = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ invoked3.countDown();
+ try {
+ block2.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ invoked3.countDown();
+ fail();
+ }
+ });
+ invoked3.await();
+
+ for (int i = 2; i <= 5; i++) {
+ clusterService.submitStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ fail();
+ }
+ });
+ }
+ Thread.sleep(100);
+
+ pendingClusterTasks = clusterService.pendingTasks();
+ assertThat(pendingClusterTasks.size(), equalTo(4));
+ controlSources = new HashSet<String>(Arrays.asList("2", "3", "4", "5"));
+ for (PendingClusterTask task : pendingClusterTasks) {
+ assertTrue(controlSources.remove(task.source().string()));
+ }
+ assertTrue(controlSources.isEmpty());
+
+ response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
+ assertThat(response.pendingTasks().size(), equalTo(4));
+ controlSources = new HashSet<String>(Arrays.asList("2", "3", "4", "5"));
+ for (PendingClusterTask task : response) {
+ assertTrue(controlSources.remove(task.source().string()));
+ assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
+ }
+ assertTrue(controlSources.isEmpty());
+ block2.countDown();
+ }
+
+ @Test
+ public void testListenerCallbacks() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("discovery.type", "zen")
+ .put("discovery.zen.minimum_master_nodes", 1)
+ .put("discovery.zen.ping_timeout", "200ms")
+ .put("discovery.initial_state_timeout", "500ms")
+ .put("plugin.types", TestPlugin.class.getName())
+ .build();
+
+ cluster().startNode(settings);
+ ClusterService clusterService1 = cluster().getInstance(ClusterService.class);
+ MasterAwareService testService1 = cluster().getInstance(MasterAwareService.class);
+
+ // the first node should be a master as the minimum required is 1
+ assertThat(clusterService1.state().nodes().masterNode(), notNullValue());
+ assertThat(clusterService1.state().nodes().localNodeMaster(), is(true));
+ assertThat(testService1.master(), is(true));
+
+ String node_1 = cluster().startNode(settings);
+ final ClusterService clusterService2 = cluster().getInstance(ClusterService.class, node_1);
+ MasterAwareService testService2 = cluster().getInstance(MasterAwareService.class, node_1);
+
+ ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
+ assertThat(clusterHealth.isTimedOut(), equalTo(false));
+
+ // the second node should not be the master as node1 is already the master.
+ assertThat(clusterService2.state().nodes().localNodeMaster(), is(false));
+ assertThat(testService2.master(), is(false));
+
+ cluster().stopCurrentMasterNode();
+ clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").execute().actionGet();
+ assertThat(clusterHealth.isTimedOut(), equalTo(false));
+
+ // now that node1 is closed, node2 should be elected as master
+ assertThat(clusterService2.state().nodes().localNodeMaster(), is(true));
+ assertThat(testService2.master(), is(true));
+
+ Settings newSettings = settingsBuilder()
+ .put("discovery.zen.minimum_master_nodes", 2)
+ .put("discovery.type", "zen")
+ .build();
+ client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();
+
+ // there should not be any master as the minimum number of required eligible masters is not met
+ awaitBusy(new Predicate<Object>() {
+ public boolean apply(Object obj) {
+ return clusterService2.state().nodes().masterNode() == null;
+ }
+ });
+ assertThat(testService2.master(), is(false));
+
+
+ String node_2 = cluster().startNode(settings);
+ clusterService1 = cluster().getInstance(ClusterService.class, node_2);
+ testService1 = cluster().getInstance(MasterAwareService.class, node_2);
+
+ // make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive the updated cluster state...
+ assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false));
+ assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false));
+
+ // now that we started node1 again, a new master should be elected
+ assertThat(clusterService1.state().nodes().masterNode(), is(notNullValue()));
+ if (node_2.equals(clusterService1.state().nodes().masterNode().name())) {
+ assertThat(testService1.master(), is(true));
+ assertThat(testService2.master(), is(false));
+ } else {
+ assertThat(testService1.master(), is(false));
+ assertThat(testService2.master(), is(true));
+ }
+
+ }
+
+ public static class TestPlugin extends AbstractPlugin {
+
+ @Override
+ public String name() {
+ return "test plugin";
+ }
+
+ @Override
+ public String description() {
+ return "test plugin";
+ }
+
+ @Override
+ public Collection<Class<? extends LifecycleComponent>> services() {
+ List<Class<? extends LifecycleComponent>> services = new ArrayList<Class<? extends LifecycleComponent>>(1);
+ services.add(MasterAwareService.class);
+ return services;
+ }
+ }
+
+ @Singleton
+ public static class MasterAwareService extends AbstractLifecycleComponent<MasterAwareService> implements LocalNodeMasterListener {
+
+ private final ClusterService clusterService;
+ private volatile boolean master;
+
+ @Inject
+ public MasterAwareService(Settings settings, ClusterService clusterService) {
+ super(settings);
+ clusterService.add(this);
+ this.clusterService = clusterService;
+ logger.info("initialized test service");
+ }
+
+ @Override
+ public void onMaster() {
+ logger.info("on master [" + clusterService.localNode() + "]");
+ master = true;
+ }
+
+ @Override
+ public void offMaster() {
+ logger.info("off master [" + clusterService.localNode() + "]");
+ master = false;
+ }
+
+ public boolean master() {
+ return master;
+ }
+
+ @Override
+ protected void doStart() throws ElasticsearchException {
+ }
+
+ @Override
+ protected void doStop() throws ElasticsearchException {
+ }
+
+ @Override
+ protected void doClose() throws ElasticsearchException {
+ }
+
+ @Override
+ public String executorName() {
+ return ThreadPool.Names.SAME;
+ }
+
+ }
+}