diff options
Diffstat (limited to 'src/main/java/org/elasticsearch/indices/InternalIndicesService.java')
-rw-r--r-- | src/main/java/org/elasticsearch/indices/InternalIndicesService.java | 377 |
1 files changed, 377 insertions, 0 deletions
diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java new file mode 100644 index 0000000..8115962 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -0,0 +1,377 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import com.google.common.collect.*; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; +import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.*; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.index.*; +import org.elasticsearch.index.aliases.IndexAliasesServiceModule; +import org.elasticsearch.index.analysis.AnalysisModule; +import org.elasticsearch.index.analysis.AnalysisService; +import org.elasticsearch.index.cache.IndexCache; +import org.elasticsearch.index.cache.IndexCacheModule; +import org.elasticsearch.index.codec.CodecModule; +import org.elasticsearch.index.engine.IndexEngine; +import org.elasticsearch.index.engine.IndexEngineModule; +import org.elasticsearch.index.fielddata.IndexFieldDataModule; +import org.elasticsearch.index.fielddata.IndexFieldDataService; +import org.elasticsearch.index.flush.FlushStats; +import org.elasticsearch.index.gateway.IndexGateway; +import org.elasticsearch.index.gateway.IndexGatewayModule; +import org.elasticsearch.index.get.GetStats; +import org.elasticsearch.index.indexing.IndexingStats; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MapperServiceModule; +import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.query.IndexQueryParserModule; +import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.refresh.RefreshStats; +import org.elasticsearch.index.search.stats.SearchStats; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.service.InternalIndexService; +import org.elasticsearch.index.settings.IndexSettingsModule; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.store.IndexStore; +import org.elasticsearch.index.store.IndexStoreModule; +import org.elasticsearch.indices.analysis.IndicesAnalysisService; +import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.indices.store.IndicesStore; +import org.elasticsearch.plugins.IndexPluginsModule; +import org.elasticsearch.plugins.PluginsService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Sets.newHashSet; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; + +/** + * + */ +public class InternalIndicesService extends AbstractLifecycleComponent<IndicesService> implements IndicesService { + + private final InternalIndicesLifecycle indicesLifecycle; + + private final IndicesAnalysisService indicesAnalysisService; + + private final IndicesStore indicesStore; + + private final Injector injector; + + private final PluginsService pluginsService; + + private final Map<String, Injector> indicesInjectors = new HashMap<String, Injector>(); + + private volatile ImmutableMap<String, IndexService> indices = ImmutableMap.of(); + + private final OldShardsStats oldShardsStats = new OldShardsStats(); + + @Inject + public InternalIndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) { + super(settings); + this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; + this.indicesAnalysisService = indicesAnalysisService; + this.indicesStore = indicesStore; + this.injector = injector; + + this.pluginsService = injector.getInstance(PluginsService.class); + + this.indicesLifecycle.addListener(oldShardsStats); + } + + @Override + protected void doStart() throws ElasticsearchException { + } + + @Override + protected void doStop() throws ElasticsearchException { + ImmutableSet<String> indices = ImmutableSet.copyOf(this.indices.keySet()); + final CountDownLatch latch = new CountDownLatch(indices.size()); + + final ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown")); + final ExecutorService shardsStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("shards_shutdown")); + + for (final String index : indices) { + indicesStopExecutor.execute(new Runnable() { + @Override + public void run() { + try { + removeIndex(index, "shutdown", shardsStopExecutor); + } catch (Throwable e) { + logger.warn("failed to delete index on stop [" + index + "]", e); + } finally { + latch.countDown(); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + // ignore + } finally { + shardsStopExecutor.shutdown(); + indicesStopExecutor.shutdown(); + } + } + + @Override + protected void doClose() throws ElasticsearchException { + injector.getInstance(RecoverySettings.class).close(); + indicesStore.close(); + indicesAnalysisService.close(); + } + + @Override + public IndicesLifecycle indicesLifecycle() { + return this.indicesLifecycle; + } + + @Override + public NodeIndicesStats stats(boolean includePrevious) { + return stats(true, new CommonStatsFlags().all()); + } + + @Override + public NodeIndicesStats stats(boolean includePrevious, CommonStatsFlags flags) { + CommonStats oldStats = new CommonStats(flags); + + if (includePrevious) { + Flag[] setFlags = flags.getFlags(); + for (Flag flag : setFlags) { + switch (flag) { + case Get: + oldStats.get.add(oldShardsStats.getStats); + break; + case Indexing: + oldStats.indexing.add(oldShardsStats.indexingStats); + break; + case Search: + oldStats.search.add(oldShardsStats.searchStats); + break; + case Merge: + oldStats.merge.add(oldShardsStats.mergeStats); + break; + case Refresh: + oldStats.refresh.add(oldShardsStats.refreshStats); + break; + case Flush: + oldStats.flush.add(oldShardsStats.flushStats); + break; + } + } + } + + Map<Index, List<IndexShardStats>> statsByShard = Maps.newHashMap(); + for (IndexService indexService : indices.values()) { + for (IndexShard indexShard : indexService) { + try { + IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard, flags) }); + if (!statsByShard.containsKey(indexService.index())) { + statsByShard.put(indexService.index(), Lists.<IndexShardStats>newArrayList(indexShardStats)); + } else { + statsByShard.get(indexService.index()).add(indexShardStats); + } + } catch (IllegalIndexShardStateException e) { + // we can safely ignore illegal state on ones that are closing for example + } + } + } + return new NodeIndicesStats(oldStats, statsByShard); + } + + /** + * Returns <tt>true</tt> if changes (adding / removing) indices, shards and so on are allowed. + */ + public boolean changesAllowed() { + // we check on stop here since we defined stop when we delete the indices + return lifecycle.started(); + } + + @Override + public UnmodifiableIterator<IndexService> iterator() { + return indices.values().iterator(); + } + + public boolean hasIndex(String index) { + return indices.containsKey(index); + } + + public Set<String> indices() { + return newHashSet(indices.keySet()); + } + + public IndexService indexService(String index) { + return indices.get(index); + } + + @Override + public IndexService indexServiceSafe(String index) throws IndexMissingException { + IndexService indexService = indexService(index); + if (indexService == null) { + throw new IndexMissingException(new Index(index)); + } + return indexService; + } + + public synchronized IndexService createIndex(String sIndexName, Settings settings, String localNodeId) throws ElasticsearchException { + if (!lifecycle.started()) { + throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed"); + } + Index index = new Index(sIndexName); + if (indicesInjectors.containsKey(index.name())) { + throw new IndexAlreadyExistsException(index); + } + + indicesLifecycle.beforeIndexCreated(index); + + logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS)); + + Settings indexSettings = settingsBuilder() + .put(this.settings) + .put(settings) + .classLoader(settings.getClassLoader()) + .build(); + + ModulesBuilder modules = new ModulesBuilder(); + modules.add(new IndexNameModule(index)); + modules.add(new LocalNodeIdModule(localNodeId)); + modules.add(new IndexSettingsModule(index, indexSettings)); + modules.add(new IndexPluginsModule(indexSettings, pluginsService)); + modules.add(new IndexStoreModule(indexSettings)); + modules.add(new IndexEngineModule(indexSettings)); + modules.add(new AnalysisModule(indexSettings, indicesAnalysisService)); + modules.add(new SimilarityModule(indexSettings)); + modules.add(new IndexCacheModule(indexSettings)); + modules.add(new IndexFieldDataModule(indexSettings)); + modules.add(new CodecModule(indexSettings)); + modules.add(new MapperServiceModule()); + modules.add(new IndexQueryParserModule(indexSettings)); + modules.add(new IndexAliasesServiceModule()); + modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class))); + modules.add(new IndexModule(indexSettings)); + + Injector indexInjector; + try { + indexInjector = modules.createChildInjector(injector); + } catch (CreationException e) { + throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e)); + } catch (Throwable e) { + throw new IndexCreationException(index, e); + } + + indicesInjectors.put(index.name(), indexInjector); + + IndexService indexService = indexInjector.getInstance(IndexService.class); + + indicesLifecycle.afterIndexCreated(indexService); + + indices = newMapBuilder(indices).put(index.name(), indexService).immutableMap(); + + return indexService; + } + + @Override + public void removeIndex(String index, String reason) throws ElasticsearchException { + removeIndex(index, reason, null); + } + + private synchronized void removeIndex(String index, String reason, @Nullable Executor executor) throws ElasticsearchException { + IndexService indexService; + Injector indexInjector = indicesInjectors.remove(index); + if (indexInjector == null) { + return; + } + + Map<String, IndexService> tmpMap = newHashMap(indices); + indexService = tmpMap.remove(index); + indices = ImmutableMap.copyOf(tmpMap); + + indicesLifecycle.beforeIndexClosed(indexService); + + for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) { + indexInjector.getInstance(closeable).close(); + } + + ((InternalIndexService) indexService).close(reason, executor); + + indexInjector.getInstance(IndexCache.class).close(); + indexInjector.getInstance(IndexFieldDataService.class).clear(); + indexInjector.getInstance(AnalysisService.class).close(); + indexInjector.getInstance(IndexEngine.class).close(); + + indexInjector.getInstance(IndexGateway.class).close(); + indexInjector.getInstance(MapperService.class).close(); + indexInjector.getInstance(IndexQueryParserService.class).close(); + + indexInjector.getInstance(IndexStore.class).close(); + + Injectors.close(injector); + + indicesLifecycle.afterIndexClosed(indexService.index()); + } + + static class OldShardsStats extends IndicesLifecycle.Listener { + + final SearchStats searchStats = new SearchStats(); + final GetStats getStats = new GetStats(); + final IndexingStats indexingStats = new IndexingStats(); + final MergeStats mergeStats = new MergeStats(); + final RefreshStats refreshStats = new RefreshStats(); + final FlushStats flushStats = new FlushStats(); + + @Override + public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { + if (indexShard != null) { + getStats.add(indexShard.getStats()); + indexingStats.add(indexShard.indexingStats(), false); + searchStats.add(indexShard.searchStats(), false); + mergeStats.add(indexShard.mergeStats()); + refreshStats.add(indexShard.refreshStats()); + flushStats.add(indexShard.flushStats()); + } + } + } +}
\ No newline at end of file |