summaryrefslogtreecommitdiff
path: root/src/main/java/org/apache/lucene/search/XReferenceManager.java
blob: 07fb066fa8c94496ee091043c2abf1618a01f461 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
package org.apache.lucene.search;

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Version;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Utility class to safely share instances of a certain type across multiple
 * threads, while periodically refreshing them. This class ensures each
 * reference is closed only once all threads have finished using it. It is
 * recommended to consult the documentation of {@link org.apache.lucene.search.XReferenceManager}
 * implementations for their {@link #maybeRefresh()} semantics.
 *
 * @param <G>
 *          the concrete type that will be {@link #acquire() acquired} and
 *          {@link #release(Object) released}.
 *
 * @lucene.experimental
 */
public abstract class XReferenceManager<G> implements Closeable {
    static {
        assert Version.LUCENE_46 == org.elasticsearch.Version.CURRENT.luceneVersion : "Remove this once we are on LUCENE_47 - see LUCENE-5436";
    }

  private static final String REFERENCE_MANAGER_IS_CLOSED_MSG = "this ReferenceManager is closed";

  protected volatile G current;

  private final Lock refreshLock = new ReentrantLock();

  private final List<RefreshListener> refreshListeners = new CopyOnWriteArrayList<RefreshListener>();

  private void ensureOpen() {
    if (current == null) {
      throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
    }
  }

  private synchronized void swapReference(G newReference) throws IOException {
    ensureOpen();
    final G oldReference = current;
    current = newReference;
    release(oldReference);
  }

  /**
   * Decrement reference counting on the given reference.
   * @throws java.io.IOException if reference decrement on the given resource failed.
   * */
  protected abstract void decRef(G reference) throws IOException;

  /**
   * Refresh the given reference if needed. Returns {@code null} if no refresh
   * was needed, otherwise a new refreshed reference.
   * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
   * @throws java.io.IOException if the refresh operation failed
   */
  protected abstract G refreshIfNeeded(G referenceToRefresh) throws IOException;

  /**
   * Try to increment reference counting on the given reference. Return true if
   * the operation was successful.
   * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
   */
  protected abstract boolean tryIncRef(G reference) throws IOException;

  /**
   * Obtain the current reference. You must match every call to acquire with one
   * call to {@link #release}; it's best to do so in a finally clause, and set
   * the reference to {@code null} to prevent accidental usage after it has been
   * released.
   * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
   */
  public final G acquire() throws IOException {
    G ref;

    do {
      if ((ref = current) == null) {
        throw new AlreadyClosedException(REFERENCE_MANAGER_IS_CLOSED_MSG);
      }
      if (tryIncRef(ref)) {
        return ref;
      }
      if (getRefCount(ref) == 0 && current == ref) {
        assert ref != null;
        /* if we can't increment the reader but we are
           still the current reference the RM is in a
           illegal states since we can't make any progress
           anymore. The reference is closed but the RM still
           holds on to it as the actual instance.
           This can only happen if somebody outside of the RM
           decrements the refcount without a corresponding increment
           since the RM assigns the new reference before counting down
           the reference. */
        throw new IllegalStateException("The managed reference has already closed - this is likely a bug when the reference count is modified outside of the ReferenceManager");
      }
    } while (true);
  }

  /**
    * <p>
    * Closes this ReferenceManager to prevent future {@link #acquire() acquiring}. A
    * reference manager should be closed if the reference to the managed resource
    * should be disposed or the application using the {@link org.apache.lucene.search.XReferenceManager}
    * is shutting down. The managed resource might not be released immediately,
    * if the {@link org.apache.lucene.search.XReferenceManager} user is holding on to a previously
    * {@link #acquire() acquired} reference. The resource will be released once
    * when the last reference is {@link #release(Object) released}. Those
    * references can still be used as if the manager was still active.
    * </p>
    * <p>
    * Applications should not {@link #acquire() acquire} new references from this
    * manager once this method has been called. {@link #acquire() Acquiring} a
    * resource on a closed {@link org.apache.lucene.search.XReferenceManager} will throw an
    * {@link org.apache.lucene.store.AlreadyClosedException}.
    * </p>
    *
    * @throws java.io.IOException
    *           if the underlying reader of the current reference could not be closed
   */
  @Override
  public final synchronized void close() throws IOException {
    if (current != null) {
      // make sure we can call this more than once
      // closeable javadoc says:
      // if this is already closed then invoking this method has no effect.
      swapReference(null);
      afterClose();
    }
  }

  /**
   * Returns the current reference count of the given reference.
   */
  protected abstract int getRefCount(G reference);

  /**
   *  Called after close(), so subclass can free any resources.
   *  @throws java.io.IOException if the after close operation in a sub-class throws an {@link java.io.IOException}
   * */
  protected void afterClose() throws IOException {
  }

  private void doMaybeRefresh() throws IOException {
    // it's ok to call lock() here (blocking) because we're supposed to get here
    // from either maybeRefreh() or maybeRefreshBlocking(), after the lock has
    // already been obtained. Doing that protects us from an accidental bug
    // where this method will be called outside the scope of refreshLock.
    // Per ReentrantLock's javadoc, calling lock() by the same thread more than
    // once is ok, as long as unlock() is called a matching number of times.
    refreshLock.lock();
    boolean refreshed = false;
    try {
      final G reference = acquire();
      try {
        notifyRefreshListenersBefore();
        G newReference = refreshIfNeeded(reference);
        if (newReference != null) {
          assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
          try {
            swapReference(newReference);
            refreshed = true;
          } finally {
            if (!refreshed) {
              release(newReference);
            }
          }
        }
      } finally {
        release(reference);
        notifyRefreshListenersRefreshed(refreshed);
      }
      afterMaybeRefresh();
    } finally {
      refreshLock.unlock();
    }
  }

  /**
   * You must call this (or {@link #maybeRefreshBlocking()}), periodically, if
   * you want that {@link #acquire()} will return refreshed instances.
   *
   * <p>
   * <b>Threads</b>: it's fine for more than one thread to call this at once.
   * Only the first thread will attempt the refresh; subsequent threads will see
   * that another thread is already handling refresh and will return
   * immediately. Note that this means if another thread is already refreshing
   * then subsequent threads will return right away without waiting for the
   * refresh to complete.
   *
   * <p>
   * If this method returns true it means the calling thread either refreshed or
   * that there were no changes to refresh. If it returns false it means another
   * thread is currently refreshing.
   * </p>
   * @throws java.io.IOException if refreshing the resource causes an {@link java.io.IOException}
   * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
   */
  public final boolean maybeRefresh() throws IOException {
    ensureOpen();

    // Ensure only 1 thread does refresh at once; other threads just return immediately:
    final boolean doTryRefresh = refreshLock.tryLock();
    if (doTryRefresh) {
      try {
        doMaybeRefresh();
      } finally {
        refreshLock.unlock();
      }
    }

    return doTryRefresh;
  }

  /**
   * You must call this (or {@link #maybeRefresh()}), periodically, if you want
   * that {@link #acquire()} will return refreshed instances.
   *
   * <p>
   * <b>Threads</b>: unlike {@link #maybeRefresh()}, if another thread is
   * currently refreshing, this method blocks until that thread completes. It is
   * useful if you want to guarantee that the next call to {@link #acquire()}
   * will return a refreshed instance. Otherwise, consider using the
   * non-blocking {@link #maybeRefresh()}.
   * @throws java.io.IOException if refreshing the resource causes an {@link java.io.IOException}
   * @throws org.apache.lucene.store.AlreadyClosedException if the reference manager has been {@link #close() closed}.
   */
  public final void maybeRefreshBlocking() throws IOException {
    ensureOpen();

    // Ensure only 1 thread does refresh at once
    refreshLock.lock();
    try {
      doMaybeRefresh();
    } finally {
      refreshLock.unlock();
    }
  }

  /** Called after a refresh was attempted, regardless of
   *  whether a new reference was in fact created.
   *  @throws java.io.IOException if a low level I/O exception occurs
   **/
  protected void afterMaybeRefresh() throws IOException {
  }

  /**
   * Release the reference previously obtained via {@link #acquire()}.
   * <p>
   * <b>NOTE:</b> it's safe to call this after {@link #close()}.
   * @throws java.io.IOException if the release operation on the given resource throws an {@link java.io.IOException}
   */
  public final void release(G reference) throws IOException {
    assert reference != null;
    decRef(reference);
  }

  private void notifyRefreshListenersBefore() throws IOException {
    for (RefreshListener refreshListener : refreshListeners) {
      refreshListener.beforeRefresh();
    }
  }

  private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException {
    for (RefreshListener refreshListener : refreshListeners) {
      refreshListener.afterRefresh(didRefresh);
    }
  }

  /**
   * Adds a listener, to be notified when a reference is refreshed/swapped.
   */
  public void addListener(RefreshListener listener) {
    if (listener == null) {
      throw new NullPointerException("Listener cannot be null");
    }
    refreshListeners.add(listener);
  }

  /**
   * Remove a listener added with {@link #addListener(RefreshListener)}.
   */
  public void removeListener(RefreshListener listener) {
    if (listener == null) {
      throw new NullPointerException("Listener cannot be null");
    }
    refreshListeners.remove(listener);
  }

  /** Use to receive notification when a refresh has
   *  finished.  See {@link #addListener}. */
  public interface RefreshListener {

    /** Called right before a refresh attempt starts. */
    void beforeRefresh() throws IOException;

    /** Called after the attempted refresh; if the refresh
     * did open a new reference then didRefresh will be true
     * and {@link #acquire()} is guaranteed to return the new
     * reference. */
    void afterRefresh(boolean didRefresh) throws IOException;
  }
}