/* * Copyright (C) 2006 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under * the License. */ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Internal.toNanosSaturated; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; import com.google.common.annotations.Beta; import com.google.common.annotations.GwtCompatible; import com.google.common.annotations.GwtIncompatible; import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.CollectionFuture.ListFuture; import com.google.common.util.concurrent.ImmediateFuture.ImmediateCancelledFuture; import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedFuture; import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** * Static utility methods pertaining to the {@link Future} interface. * *
Many of these methods use the {@link ListenableFuture} API; consult the Guava User Guide * article on {@code * ListenableFuture}. * *
The main purpose of {@code ListenableFuture} is to help you chain together a graph of * asynchronous operations. You can chain them together manually with calls to methods like {@link * Futures#transform(ListenableFuture, Function, Executor) Futures.transform}, but you will often * find it easier to use a framework. Frameworks automate the process, often adding features like * monitoring, debugging, and cancellation. Examples of frameworks include: * *
If you do chain your operations manually, you may want to use {@link FluentFuture}.
*
* @author Kevin Bourrillion
* @author Nishant Thakkar
* @author Sven Mawson
* @since 1.0
*/
@GwtCompatible(emulated = true)
public final class Futures extends GwtFuturesCatchingSpecialization {
// A note on memory visibility.
// Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine)
// have two requirements that significantly complicate their design.
// 1. Cancellation should propagate from the returned future to the input future(s).
// 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion.
//
// A consequence of these requirements is that the delegate futures cannot be stored in
// final fields.
//
// For simplicity the rest of this description will discuss Futures.catching since it is the
// simplest instance, though very similar descriptions apply to many other classes in this file.
//
// In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field
// 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the
// 'inputFuture' field is read and where we will have to consider visibility of the write
// operation in the constructor.
//
// 1. In the listener that performs the callback. In this case it is fine since inputFuture is
// assigned prior to calling addListener, and addListener happens-before any invocation of the
// listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible
// to the listener.
//
// 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine.
// There is currently nothing that enforces that the write to inputFuture in the constructor is
// visible to done(). This is because there is no happens before edge between the write and a
// (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue,
// it would just add an edge such that if done() observed non-null, then it would also
// definitely observe all earlier writes, but we still have no guarantee that done() would see
// the inital write (just stronger guarantees if it does).
//
// See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html
// For a (long) discussion about this specific issue and the general futility of life.
//
// For the time being we are OK with the problem discussed above since it requires a caller to
// introduce a very specific kind of data-race. And given the other operations performed by these
// methods that involve volatile read/write operations, in practice there is no issue. Also, the
// way in such a visibility issue would surface is most likely as a failure of cancel() to
// propagate to the input. Cancellation propagation is fundamentally racy so this is fine.
//
// Future versions of the JMM may revise safe construction semantics in such a way that we can
// safely publish these objects and we won't need this whole discussion.
// TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs
// that should resolve the issue. This comes at the cost of adding more write barriers to the
// implementations.
private Futures() {}
/**
* Creates a {@code ListenableFuture} which has its value set immediately upon construction. The
* getters just return the value. This {@code Future} can't be canceled or timed out and its
* {@code isDone()} method always returns {@code true}.
*/
public static The returned {@code Future} can't be cancelled, and its {@code isDone()} method always
* returns {@code true}. Calling {@code get()} will immediately throw the provided {@code
* Throwable} wrapped in an {@code ExecutionException}.
*/
public static Usage example:
*
* When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
* documentation. All its warnings about heavyweight listeners are also applicable to heavyweight
* functions passed to this method.
*
* @param input the primary input {@code Future}
* @param exceptionType the exception type that triggers use of {@code fallback}. The exception
* type is matched against the input's exception. "The input's exception" means the cause of
* the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
* different kind of exception, that exception itself. To avoid hiding bugs and other
* unrecoverable errors, callers should prefer more specific types, avoiding {@code
* Throwable.class} in particular.
* @param fallback the {@link Function} to be called if {@code input} fails with the expected
* exception type. The function's argument is the input's exception. "The input's exception"
* means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
* {@code get()} throws a different kind of exception, that exception itself.
* @param executor the executor that runs {@code fallback} if {@code input} fails
* @since 19.0
*/
@Beta
@Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
public static Usage examples:
*
* The fallback can also choose to propagate the original exception when desired:
*
* When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
* documentation. All its warnings about heavyweight listeners are also applicable to heavyweight
* functions passed to this method. (Specifically, {@code directExecutor} functions should avoid
* heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should
* occur in other threads responsible for completing the returned {@code Future}.)
*
* @param input the primary input {@code Future}
* @param exceptionType the exception type that triggers use of {@code fallback}. The exception
* type is matched against the input's exception. "The input's exception" means the cause of
* the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
* different kind of exception, that exception itself. To avoid hiding bugs and other
* unrecoverable errors, callers should prefer more specific types, avoiding {@code
* Throwable.class} in particular.
* @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected
* exception type. The function's argument is the input's exception. "The input's exception"
* means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
* {@code get()} throws a different kind of exception, that exception itself.
* @param executor the executor that runs {@code fallback} if {@code input} fails
* @since 19.0 (similar functionality in 14.0 as {@code withFallback})
*/
@Beta
@Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
public static The delegate future is interrupted and cancelled if it times out.
*
* @param delegate The future to delegate to.
* @param time when to timeout the future
* @param scheduledExecutor The executor service to enforce the timeout.
* @since 28.0
*/
@Beta
@GwtIncompatible // java.util.concurrent.ScheduledExecutorService
public static The delegate future is interrupted and cancelled if it times out.
*
* @param delegate The future to delegate to.
* @param time when to timeout the future
* @param unit the time unit of the time parameter
* @param scheduledExecutor The executor service to enforce the timeout.
* @since 19.0
*/
@Beta
@GwtIncompatible // java.util.concurrent.ScheduledExecutorService
@SuppressWarnings("GoodTime") // should accept a java.time.Duration
public static More precisely, the returned {@code Future} takes its result from a {@code Future} produced
* by applying the given {@code AsyncFunction} to the result of the original {@code Future}.
* Example usage:
*
* When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
* documentation. All its warnings about heavyweight listeners are also applicable to heavyweight
* functions passed to this method. (Specifically, {@code directExecutor} functions should avoid
* heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should
* occur in other threads responsible for completing the returned {@code Future}.)
*
* The returned {@code Future} attempts to keep its cancellation state in sync with that of the
* input future and that of the future returned by the chain function. That is, if the returned
* {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
* other two is cancelled, the returned {@code Future} will receive a callback in which it will
* attempt to cancel itself.
*
* @param input The future to transform
* @param function A function to transform the result of the input future to the result of the
* output future
* @param executor Executor to run the function in.
* @return A future that holds result of the function (if the input succeeded) or the original
* input's failure (if not)
* @since 19.0 (in 11.0 as {@code transform})
*/
@Beta
public static ListenableFuture When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
* documentation. All its warnings about heavyweight listeners are also applicable to heavyweight
* functions passed to this method.
*
* The returned {@code Future} attempts to keep its cancellation state in sync with that of the
* input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel
* the input, and if the input is cancelled, the returned {@code Future} will receive a callback
* in which it will attempt to cancel itself.
*
* An example use of this method is to convert a serializable object returned from an RPC into
* a POJO.
*
* @param input The future to transform
* @param function A Function to transform the results of the provided future to the results of
* the returned future.
* @param executor Executor to run the function in.
* @return A future that holds result of the transformation.
* @since 9.0 (in 2.0 as {@code compose})
*/
@Beta
public static ListenableFuture The returned {@code Future} reflects the input's cancellation state directly, and any
* attempt to cancel the returned Future is likewise passed through to the input Future.
*
* Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} only apply the timeout
* to the execution of the underlying {@code Future}, not to the execution of the
* transformation function.
*
* The primary audience of this method is callers of {@code transform} who don't have a {@code
* ListenableFuture} available and do not mind repeated, lazy function evaluation.
*
* @param input The future to transform
* @param function A Function to transform the results of the provided future to the results of
* the returned future.
* @return A future that returns the result of the transformation.
* @since 10.0
*/
@Beta
@GwtIncompatible // TODO
public static Future The list of results is in the same order as the input list.
*
* This differs from {@link #successfulAsList(ListenableFuture[])} in that it will return a
* failed future if any of the items fails.
*
* Canceling this future will attempt to cancel all the component futures, and if any of the
* provided futures fails or is canceled, this one is, too.
*
* @param futures futures to combine
* @return a future that provides a list of the results of the component futures
* @since 10.0
*/
@Beta
@SafeVarargs
public static The list of results is in the same order as the input list.
*
* This differs from {@link #successfulAsList(Iterable)} in that it will return a failed future
* if any of the items fails.
*
* Canceling this future will attempt to cancel all the component futures, and if any of the
* provided futures fails or is canceled, this one is, too.
*
* @param futures futures to combine
* @return a future that provides a list of the results of the component futures
* @since 10.0
*/
@Beta
public static Any failures from the input futures will not be propagated to the returned future.
*
* @since 20.0
*/
@Beta
@SafeVarargs
public static Any failures from the input futures will not be propagated to the returned future.
*
* @since 20.0
*/
@Beta
public static If any input fails, the returned future fails immediately.
*
* @since 20.0
*/
@Beta
@SafeVarargs
public static If any input fails, the returned future fails immediately.
*
* @since 20.0
*/
@Beta
public static See {@link #whenAllComplete} and {@link #whenAllSucceed} for how to instantiate this class.
*
* Example:
*
* If the combiner throws a {@code CancellationException}, the returned future will be
* cancelled.
*
* If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and returned as the cause of the new {@code
* ExecutionException} that gets thrown by the returned combined future.
*
* Canceling this future will attempt to cancel all the component futures.
*/
public If the combiner throws a {@code CancellationException}, the returned future will be
* cancelled.
*
* If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code
* ExecutionException} will be extracted and returned as the cause of the new {@code
* ExecutionException} that gets thrown by the returned combined future.
*
* Canceling this future will attempt to cancel all the component futures.
*/
// TODO(cpovirk): Remove this
public If the combiner throws a {@code CancellationException}, the returned future will be
* cancelled.
*
* Canceling this Future will attempt to cancel all the component futures.
*
* @since 23.6
*/
public ListenableFuture> run(final Runnable combiner, Executor executor) {
return call(
new Callable The list of results is in the same order as the input list.
*
* This differs from {@link #allAsList(ListenableFuture[])} in that it's tolerant of failed
* futures for any of the items, representing them as {@code null} in the result list.
*
* Canceling this future will attempt to cancel all the component futures.
*
* @param futures futures to combine
* @return a future that provides a list of the results of the component futures
* @since 10.0
*/
@Beta
@SafeVarargs
public static The list of results is in the same order as the input list.
*
* This differs from {@link #allAsList(Iterable)} in that it's tolerant of failed futures for
* any of the items, representing them as {@code null} in the result list.
*
* Canceling this future will attempt to cancel all the component futures.
*
* @param futures futures to combine
* @return a future that provides a list of the results of the component futures
* @since 10.0
*/
@Beta
public static "In the order that they complete" means, for practical purposes, about what you would
* expect, but there are some subtleties. First, we do guarantee that, if the output future at
* index n is done, the output future at index n-1 is also done. (But as usual with futures, some
* listeners for future n may complete before some for future n-1.) However, it is possible, if
* one input completes with result X and another later with result Y, for Y to come before X in
* the output future list. (Such races are impossible to solve without global synchronization of
* all future completions. And they should have little practical impact.)
*
* Cancelling a delegate future propagates to input futures once all the delegates complete,
* either from cancellation or because an input future has completed. If N futures are passed in,
* and M delegates are cancelled, the remaining M input futures will be cancelled once N - M of
* the input futures complete. If all the delegates are cancelled, all the input futures will be
* too.
*
* @since 17.0
*/
@Beta
public static The callback is run on {@code executor}. There is no guaranteed ordering of execution of
* callbacks, but any callback added through this method is guaranteed to be called once the
* computation is complete.
*
* Example:
*
* When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
* the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
* documentation. All its warnings about heavyweight listeners are also applicable to heavyweight
* callbacks passed to this method.
*
* For a more general interface to attach a completion listener to a {@code Future}, see {@link
* ListenableFuture#addListener addListener}.
*
* @param future The future attach the callback to.
* @param callback The callback to invoke when {@code future} is completed.
* @param executor The executor to run {@code callback} when the future completes.
* @since 10.0
*/
public static The benefits of this method are twofold. First, the name "getDone" suggests to readers that
* the {@code Future} is already done. Second, if buggy code calls {@code getDone} on a {@code
* Future} that is still pending, the program will throw instead of block. This can be important
* for APIs like {@link #whenAllComplete whenAllComplete(...)}{@code .}{@link
* FutureCombiner#call(Callable, Executor) call(...)}, where it is easy to use a new input from
* the {@code call} implementation but forget to add it to the arguments of {@code
* whenAllComplete}.
*
* If you are looking for a method to determine whether a given {@code Future} is done, use the
* instance method {@link Future#isDone()}.
*
* @throws ExecutionException if the {@code Future} failed with an exception
* @throws CancellationException if the {@code Future} was cancelled
* @throws IllegalStateException if the {@code Future} is not done
* @since 20.0
*/
// TODO(cpovirk): Consider calling getDone() in our own code.
public static Exceptions from {@code Future.get} are treated as follows:
*
* The overall principle is to continue to treat every checked exception as a checked
* exception, every unchecked exception as an unchecked exception, and every error as an error. In
* addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the
* new stack trace matches that of the current thread.
*
* Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor
* that accepts zero or more arguments, all of type {@code String} or {@code Throwable}
* (preferring constructors with at least one {@code String}) and calling the constructor via
* reflection. If the exception did not already have a cause, one is set by calling {@link
* Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code
* IllegalArgumentException} is thrown.
*
* @throws X if {@code get} throws any checked exception except for an {@code ExecutionException}
* whose cause is not itself a checked exception
* @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a
* {@code RuntimeException} as its cause
* @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code
* Error} as its cause
* @throws CancellationException if {@code get} throws a {@code CancellationException}
* @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or
* does not have a suitable constructor
* @since 19.0 (in 10.0 as {@code get})
*/
@Beta
@GwtIncompatible // reflection
public static Exceptions from {@code Future.get} are treated as follows:
*
* The overall principle is to continue to treat every checked exception as a checked
* exception, every unchecked exception as an unchecked exception, and every error as an error. In
* addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the
* new stack trace matches that of the current thread.
*
* Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor
* that accepts zero or more arguments, all of type {@code String} or {@code Throwable}
* (preferring constructors with at least one {@code String}) and calling the constructor via
* reflection. If the exception did not already have a cause, one is set by calling {@link
* Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code
* IllegalArgumentException} is thrown.
*
* @throws X if {@code get} throws any checked exception except for an {@code ExecutionException}
* whose cause is not itself a checked exception
* @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a
* {@code RuntimeException} as its cause
* @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code
* Error} as its cause
* @throws CancellationException if {@code get} throws a {@code CancellationException}
* @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or
* does not have a suitable constructor
* @since 28.0
*/
@Beta
@GwtIncompatible // reflection
public static Exceptions from {@code Future.get} are treated as follows:
*
* The overall principle is to continue to treat every checked exception as a checked
* exception, every unchecked exception as an unchecked exception, and every error as an error. In
* addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the
* new stack trace matches that of the current thread.
*
* Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor
* that accepts zero or more arguments, all of type {@code String} or {@code Throwable}
* (preferring constructors with at least one {@code String}) and calling the constructor via
* reflection. If the exception did not already have a cause, one is set by calling {@link
* Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code
* IllegalArgumentException} is thrown.
*
* @throws X if {@code get} throws any checked exception except for an {@code ExecutionException}
* whose cause is not itself a checked exception
* @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a
* {@code RuntimeException} as its cause
* @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code
* Error} as its cause
* @throws CancellationException if {@code get} throws a {@code CancellationException}
* @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or
* does not have a suitable constructor
* @since 19.0 (in 10.0 as {@code get} and with different parameter order)
*/
@Beta
@GwtIncompatible // reflection
@SuppressWarnings("GoodTime") // should accept a java.time.Duration
public static Exceptions from {@code Future.get} are treated as follows:
*
* The overall principle is to eliminate all checked exceptions: to loop to avoid {@code
* InterruptedException}, to pass through {@code CancellationException}, and to wrap any exception
* from the underlying computation in an {@code UncheckedExecutionException} or {@code
* ExecutionError}.
*
* For an uninterruptible {@code get} that preserves other exceptions, see {@link
* Uninterruptibles#getUninterruptibly(Future)}.
*
* @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with an
* {@code Exception} as its cause
* @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code
* Error} as its cause
* @throws CancellationException if {@code get} throws a {@code CancellationException}
* @since 10.0
*/
public static {@code
* ListenableFuture
*
* {@code
* ListenableFuture
*
* {@code
* ListenableFuture
*
* {@code
* ListenableFuture
*
* {@code
* ListenableFuture
*
* > rowsFuture =
* transform(queryFuture, QueryResult::getRows, executor);
* }
> allAsList(ListenableFuture extends V>... futures) {
return new ListFuture
> allAsList(
Iterable extends ListenableFuture extends V>> futures) {
return new ListFuture
{@code
* final ListenableFuture
*
* @since 20.0
*/
@Beta
// TODO(cpovirk): Consider removing, especially if we provide run(Runnable)
@GwtCompatible
public static final class FutureCombiner> recentCommandsFuture =
* recentCommandsService.findRecentCommands(username);
* ListenableFuture
> successfulAsList(
ListenableFuture extends V>... futures) {
return new ListFuture
> successfulAsList(
Iterable extends ListenableFuture extends V>> futures) {
return new ListFuture
{@code
* ListenableFuture
*
*
*
*
*
*
*
*
*
*
*
*
*
*