/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _DECAF_UTIL_CONCURRENT_FUTURETASK_H_ #define _DECAF_UTIL_CONCURRENT_FUTURETASK_H_ #include #include #include #include #include #include #include #include #include #include #include namespace decaf { namespace util { namespace concurrent { using decaf::lang::Pointer; /** * A cancellable asynchronous computation. This class provides a base implementation of * Future, with methods to start and cancel a computation, query to see if the computation * is complete, and retrieve the result of the computation. The result can only be retrieved * when the computation has completed; the get method will block if the computation has not * yet completed. Once the computation has completed, the computation cannot be restarted * or canceled. * * A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask * implements Runnable, a FutureTask can be submitted to an Executor for execution. * * In addition to serving as a stand-alone class, this class provides protected functionality * that may be useful when creating customized task classes. * * @since 1.0 */ template class FutureTask : public RunnableFuture { private: /** * A Callable subclass that runs given task and returns given result, used to * wrap either a Runnable or Callable pointer and */ class FutureTaskAdapter : public decaf::util::concurrent::Callable { private: decaf::lang::Runnable* task; decaf::util::concurrent::Callable* callable; bool owns; T result; private: FutureTaskAdapter(const FutureTaskAdapter&); FutureTaskAdapter operator= (const FutureTaskAdapter&); public: FutureTaskAdapter(decaf::lang::Runnable* task, const T& result, bool owns = true) : decaf::util::concurrent::Callable(), task(task), callable(NULL), owns(owns), result(result) { } FutureTaskAdapter(decaf::util::concurrent::Callable* task, bool owns = true) : decaf::util::concurrent::Callable(), task(NULL), callable(task), owns(owns), result(T()) { } virtual ~FutureTaskAdapter() { try{ if (owns) { delete this->task; delete this->callable; } } DECAF_CATCHALL_NOTHROW() } virtual T call() { if (this->task != NULL) { this->task->run(); return result; } else { return this->callable->call(); } } }; /** * Synchronization control for FutureTask. * * Uses AQS sync state to represent run status */ class FutureTaskSync : public locks::AbstractQueuedSynchronizer { private: enum SyncState { /** State value representing that task is ready to run */ READY = 0, /** State value representing that task is running */ RUNNING = 1, /** State value representing that task ran */ RAN = 2, /** State value representing that task was canceled */ CANCELLED = 4 }; /** The underlying callable */ Pointer< Callable > callable; /** The result to return from get() */ T result; /** The exception to throw from get() */ Pointer exception; // The FutureTask parent of the Sync object. FutureTask* parent; /** * The thread running task. When nulled after set/cancel, this indicates that * the results are accessible. */ decaf::lang::Thread* runner; private: FutureTaskSync(const FutureTaskSync&); FutureTaskSync operator= (const FutureTaskSync&); public: FutureTaskSync(FutureTask* parent, Callable* callable) : AbstractQueuedSynchronizer(), callable(callable), result(), exception(), parent(parent), runner(NULL) { } virtual ~FutureTaskSync() { } bool innerIsCancelled() const { return getState() == CANCELLED; } bool innerIsDone() const { return ranOrCancelled(getState()) && this->runner == NULL; } T innerGet() { this->acquireSharedInterruptibly(0); if (getState() == CANCELLED) { throw CancellationException(); } if (exception != NULL) { throw ExecutionException(exception->clone()); } return result; } T innerGet(long long nanosTimeout) { if (!tryAcquireSharedNanos(0, nanosTimeout)) { throw TimeoutException(); } if (getState() == CANCELLED) { throw CancellationException(); } if (exception != NULL) { throw ExecutionException(exception->clone()); } return result; } void innerSet(const T& result) { for (;;) { int s = getState(); if (s == RAN) { return; } if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } if (compareAndSetState(s, RAN)) { this->result = result; releaseShared(0); this->parent->done(); return; } } } void innerSetException(const decaf::lang::Exception& t) { for (;;) { int s = getState(); if (s == RAN) { return; } if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } if (compareAndSetState(s, RAN)) { exception.reset(t.clone()); releaseShared(0); this->parent->done(); return; } } } bool innerCancel(bool mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) { return false; } if (compareAndSetState(s, CANCELLED)) { break; } } if (mayInterruptIfRunning) { decaf::lang::Thread* r = runner; if (r != NULL) { r->interrupt(); } } releaseShared(0); this->parent->done(); return true; } void innerRun() { if (!compareAndSetState(READY, RUNNING)) { return; } this->runner = decaf::lang::Thread::currentThread(); if (getState() == RUNNING) { // recheck after setting thread T result; try { result = this->callable->call(); } catch(decaf::lang::Exception& ex) { this->parent->setException(ex); return; } catch(std::exception& stdex) { this->parent->setException(decaf::lang::Exception(&stdex)); return; } catch(...) { this->parent->setException(decaf::lang::Exception( __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution.")); return; } this->parent->set(result); } else { releaseShared(0); // cancel } } bool innerRunAndReset() { if (!compareAndSetState(READY, RUNNING)) { return false; } try { this->runner = decaf::lang::Thread::currentThread(); if (getState() == RUNNING) { this->callable->call(); // don't set result } this->runner = NULL; return compareAndSetState(RUNNING, READY); } catch(decaf::lang::Exception& ex) { this->parent->setException(ex); return false; } catch(std::exception& stdex) { this->parent->setException(decaf::lang::Exception(&stdex)); return false; } catch(...) { this->parent->setException(decaf::lang::Exception( __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution.")); return false; } } protected: /** * Implements AQS base acquire to succeed if ran or cancelled */ virtual int tryAcquireShared(int ignore DECAF_UNUSED) { return innerIsDone() ? 1 : -1; } /** * Implements AQS base release to always signal after setting * final done status by nulling runner thread. */ virtual bool tryReleaseShared(int ignore DECAF_UNUSED) { runner = NULL; return true; } private: bool ranOrCancelled(int state) const { return (state & (RAN | CANCELLED)) != 0; } }; private: Pointer sync; public: /** * Creates a FutureTask instance that will, upon running, execute the * given Callable. * * @param callable * The callable task that will be invoked when run. * @param takeOwnership * Boolean value indicating if the Executor now owns the pointer to the task. * * @throws NullPointerException if callable pointer is NULL */ FutureTask(Callable* callable, bool takeOwnership = true) : sync(NULL) { if (callable == NULL ) { throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__, "The Callable pointer passed to the constructor was NULL"); } this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(callable, takeOwnership))); } /** * Creates a FutureTask that will, upon running, execute the given Runnable, * and arrange that the get method will return the given result on successful * completion. * * @param runnable * The runnable task that the future will execute. * @param result * The result to return on successful completion. * @param takeOwnership * Boolean value indicating if the Executor now owns the pointer to the task. * * @throws NullPointerException if runnable is NULL. */ FutureTask(decaf::lang::Runnable* runnable, const T& result, bool takeOwnership = true) : sync(NULL) { if (runnable == NULL ) { throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__, "The Runnable pointer passed to the constructor was NULL"); } this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(runnable, result, takeOwnership))); } virtual ~FutureTask() { } virtual bool isCancelled() const { return this->sync->innerIsCancelled(); } virtual bool isDone() const { return this->sync->innerIsDone(); } virtual bool cancel(bool mayInterruptIfRunning) { return this->sync->innerCancel(mayInterruptIfRunning); } virtual T get() { return this->sync->innerGet(); } virtual T get(long long timeout, const TimeUnit& unit) { return this->sync->innerGet(unit.toNanos(timeout)); } FutureTask* clone() { return new FutureTask(*this); } public: /** * Protected method invoked when this task transitions to state isDone * (whether normally or via cancellation). The default implementation * does nothing. Subclasses may override this method to invoke completion * callbacks or perform bookkeeping. Note that you can query status inside * the implementation of this method to determine whether this task has * been canceled. */ virtual void done() {} /** * Sets the result of this Future to the given value unless this future * has already been set or has been cancelled. This method is invoked * internally by the run method upon successful completion of * the computation. * * @param v * The value to return as the result of this Future. */ virtual void set(const T& result) { this->sync->innerSet(result); } /** * Causes this future to report an ExecutionException with the given * Exception as its cause, unless this Future has already been set or * has been canceled. This method is invoked internally by the run * method upon failure of the computation. * * @param error * The cause of failure that is thrown from run. */ virtual void setException(const decaf::lang::Exception& error) { this->sync->innerSetException(error); } virtual void run() { this->sync->innerRun(); } /** * Executes the computation without setting its result, and then resets * this Future to initial state, failing to do so if the computation * encounters an exception or is canceled. This is designed for use * with tasks that intrinsically execute more than once. * * @return true if successfully run and reset */ virtual bool runAndReset() { return this->sync->innerRunAndReset(); } public: FutureTask(const FutureTask& source) : RunnableFuture(), sync(source.sync) { } FutureTask& operator= (const FutureTask& source) { this->sync = source.sync; return *this; } }; }}} #endif /* _DECAF_UTIL_CONCURRENT_FUTURETASK_H_ */