summaryrefslogtreecommitdiff
path: root/src/main/java/org/elasticsearch/common/recycler/Recyclers.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/elasticsearch/common/recycler/Recyclers.java')
-rw-r--r--src/main/java/org/elasticsearch/common/recycler/Recyclers.java252
1 files changed, 252 insertions, 0 deletions
diff --git a/src/main/java/org/elasticsearch/common/recycler/Recyclers.java b/src/main/java/org/elasticsearch/common/recycler/Recyclers.java
new file mode 100644
index 0000000..c1797bd
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/recycler/Recyclers.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.recycler;
+
+import com.carrotsearch.hppc.hash.MurmurHash3;
+import com.google.common.collect.Queues;
+import org.apache.lucene.util.CloseableThreadLocal;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+
+import java.lang.ref.SoftReference;
+
+public enum Recyclers {
+ ;
+
+ /** Return a {@link Recycler} that never recycles entries. */
+ public static <T> Recycler<T> none(Recycler.C<T> c) {
+ return new NoneRecycler<T>(c);
+ }
+
+ /** Return a concurrent recycler based on a deque. */
+ public static <T> Recycler<T> concurrentDeque(Recycler.C<T> c, int limit) {
+ return new ConcurrentDequeRecycler<T>(c, limit);
+ }
+
+ /** Return a recycler based on a deque. */
+ public static <T> Recycler<T> deque(Recycler.C<T> c, int limit) {
+ return new DequeRecycler<T>(c, Queues.<T>newArrayDeque(), limit);
+ }
+
+ /** Return a recycler based on a deque. */
+ public static <T> Recycler.Factory<T> dequeFactory(final Recycler.C<T> c, final int limit) {
+ return new Recycler.Factory<T>() {
+ @Override
+ public Recycler<T> build() {
+ return deque(c, limit);
+ }
+ };
+ }
+
+ /** Wrap two recyclers and forward to calls to <code>smallObjectRecycler</code> when <code>size &lt; minSize</code> and to
+ * <code>defaultRecycler</code> otherwise. */
+ public static <T> Recycler<T> sizing(final Recycler<T> defaultRecycler, final Recycler<T> smallObjectRecycler, final int minSize) {
+ return new FilterRecycler<T>() {
+
+ @Override
+ protected Recycler<T> getDelegate() {
+ return defaultRecycler;
+ }
+
+ @Override
+ public Recycler.V<T> obtain(int sizing) {
+ if (sizing > 0 && sizing < minSize) {
+ return smallObjectRecycler.obtain(sizing);
+ }
+ return super.obtain(sizing);
+ }
+
+ @Override
+ public void close() {
+ defaultRecycler.close();
+ smallObjectRecycler.close();
+ }
+
+ };
+ }
+
+ /** Create a thread-local recycler, where each thread will have its own instance, create through the provided factory. */
+ public static <T> Recycler<T> threadLocal(final Recycler.Factory<T> factory) {
+ return new FilterRecycler<T>() {
+
+ private final CloseableThreadLocal<Recycler<T>> recyclers;
+
+ {
+ recyclers = new CloseableThreadLocal<Recycler<T>>() {
+ @Override
+ protected Recycler<T> initialValue() {
+ return factory.build();
+ }
+ };
+ }
+
+ @Override
+ protected Recycler<T> getDelegate() {
+ return recyclers.get();
+ }
+
+ @Override
+ public void close() {
+ recyclers.close();
+ }
+
+ };
+ }
+
+ /** Create a recycler that is wrapped inside a soft reference, so that it cannot cause {@link OutOfMemoryError}s. */
+ public static <T> Recycler<T> soft(final Recycler.Factory<T> factory) {
+ return new FilterRecycler<T>() {
+
+ SoftReference<Recycler<T>> ref;
+
+ {
+ ref = new SoftReference<Recycler<T>>(null);
+ }
+
+ @Override
+ protected Recycler<T> getDelegate() {
+ Recycler<T> recycler = ref.get();
+ if (recycler == null) {
+ recycler = factory.build();
+ ref = new SoftReference<Recycler<T>>(recycler);
+ }
+ return recycler;
+ }
+
+ };
+ }
+
+ /** Create a recycler that wraps data in a SoftReference.
+ * @see #soft(org.elasticsearch.common.recycler.Recycler.Factory) */
+ public static <T> Recycler.Factory<T> softFactory(final Recycler.Factory<T> factory) {
+ return new Recycler.Factory<T>() {
+ @Override
+ public Recycler<T> build() {
+ return soft(factory);
+ }
+ };
+ }
+
+ /** Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#release()} are protected by
+ * a lock. */
+ public static <T> Recycler<T> locked(final Recycler<T> recycler) {
+ return new FilterRecycler<T>() {
+
+ private final Object lock;
+
+ {
+ this.lock = new Object();
+ }
+
+ @Override
+ protected Recycler<T> getDelegate() {
+ return recycler;
+ }
+
+ @Override
+ public org.elasticsearch.common.recycler.Recycler.V<T> obtain(int sizing) {
+ synchronized (lock) {
+ return super.obtain(sizing);
+ }
+ }
+
+ @Override
+ public org.elasticsearch.common.recycler.Recycler.V<T> obtain() {
+ synchronized (lock) {
+ return super.obtain();
+ }
+ }
+
+ @Override
+ protected Recycler.V<T> wrap(final Recycler.V<T> delegate) {
+ return new Recycler.V<T>() {
+
+ @Override
+ public boolean release() throws ElasticsearchException {
+ synchronized (lock) {
+ return delegate.release();
+ }
+ }
+
+ @Override
+ public T v() {
+ return delegate.v();
+ }
+
+ @Override
+ public boolean isRecycled() {
+ return delegate.isRecycled();
+ }
+
+ };
+ }
+
+ };
+ }
+
+ /** Create a concurrent implementation that can support concurrent access from <code>concurrencyLevel</code> threads with little contention. */
+ public static <T> Recycler<T> concurrent(final Recycler.Factory<T> factory, final int concurrencyLevel) {
+ if (concurrencyLevel < 1) {
+ throw new ElasticsearchIllegalArgumentException("concurrencyLevel must be >= 1");
+ }
+ if (concurrencyLevel == 1) {
+ return locked(factory.build());
+ }
+ return new FilterRecycler<T>() {
+
+ private final Recycler<T>[] recyclers;
+ {
+ @SuppressWarnings("unchecked")
+ final Recycler<T>[] recyclers = new Recycler[concurrencyLevel];
+ this.recyclers = recyclers;
+ for (int i = 0; i < concurrencyLevel; ++i) {
+ recyclers[i] = locked(factory.build());
+ }
+ }
+
+ final int slot() {
+ final long id = Thread.currentThread().getId();
+ // don't trust Thread.hashCode to have equiprobable low bits
+ int slot = (int) MurmurHash3.hash(id);
+ // make positive, otherwise % may return negative numbers
+ slot &= 0x7FFFFFFF;
+ slot %= concurrencyLevel;
+ return slot;
+ }
+
+ @Override
+ protected Recycler<T> getDelegate() {
+ return recyclers[slot()];
+ }
+
+ @Override
+ public void close() {
+ for (Recycler<T> recycler : recyclers) {
+ recycler.close();
+ }
+ }
+
+ };
+ }
+
+ public static <T> Recycler<T> concurrent(final Recycler.Factory<T> factory) {
+ return concurrent(factory, Runtime.getRuntime().availableProcessors());
+ }
+}