diff --git a/openide.util/manifest.mf b/openide.util/manifest.mf
--- a/openide.util/manifest.mf
+++ b/openide.util/manifest.mf
@@ -1,5 +1,5 @@
Manifest-Version: 1.0
OpenIDE-Module: org.openide.util
OpenIDE-Module-Localizing-Bundle: org/openide/util/Bundle.properties
-OpenIDE-Module-Specification-Version: 8.1
+OpenIDE-Module-Specification-Version: 8.2
diff --git a/openide.util/src/org/openide/util/RequestProcessor.java b/openide.util/src/org/openide/util/RequestProcessor.java
--- a/openide.util/src/org/openide/util/RequestProcessor.java
+++ b/openide.util/src/org/openide/util/RequestProcessor.java
@@ -41,15 +41,33 @@
package org.openide.util;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import java.util.Set;
import java.util.Stack;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -154,10 +172,12 @@
* }
* }
*
- *
- * @author Petr Nejedly, Jaroslav Tulach
+ *
+ * Since org.openide.util
, implements
+ * {@link java.util.concurrent.ScheduledExecutorService}
+ * @author Petr Nejedly, Jaroslav Tulach, Tim Boudreau
*/
-public final class RequestProcessor implements Executor {
+public final class RequestProcessor implements ScheduledExecutorService {
/** the static instance for users that do not want to have own processor */
private static RequestProcessor DEFAULT = new RequestProcessor();
@@ -186,7 +206,7 @@
/** If the RP was stopped, this variable will be set, every new post()
* will throw an exception and no task will be processed any further */
- boolean stopped = false;
+ volatile boolean stopped = false;
/** The lock covering following four fields. They should be accessed
* only while having this lock held. */
@@ -585,6 +605,765 @@
}
}
+ /**
+ * {@inheritDoc}
+ * @throws an IllegalStateException if called on the
+ * {@linkplain #getDefault default request processor}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public void shutdown() {
+ if (this == UNLIMITED) {
+ throw new IllegalStateException ("Cannot shut down the default " + //NOI18N
+ "request processor"); //NOI18N
+ }
+ stop();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws an IllegalStateException if called on the
+ * {@linkplain #getDefault default request processor}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public List shutdownNow() {
+ if (this == UNLIMITED) {
+ throw new IllegalStateException ("Cannot shut down the default " + //NOI18N
+ "request processor"); //NOI18N
+ }
+ //XXX more aggressive shutdown?
+ stop();
+ synchronized (processorLock) {
+ List result = new ArrayList(processors.size());
+ for (Processor p : processors) {
+ if (p != null && p.todo != null && p.todo.run != null) {
+ Runnable r = p.todo.run;
+ if (r instanceof RunnableWrapper) {
+ Runnable other = ((RunnableWrapper) r).getRunnable();
+ r = other == null ? r : other;
+ }
+ result.add(r);
+ }
+ }
+ return result;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public boolean isShutdown() {
+ return stopped;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public boolean isTerminated() {
+ boolean result = true;
+ Set set = collectProcessors(new HashSet());
+ for (Processor p : set) {
+ if (p.isAlive() && p.belongsTo(this)) {
+ result = false;
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ Parameters.notNull("unit", unit); //NOI18N
+ long timeoutMillis = unit.convert(timeout, TimeUnit.MILLISECONDS);
+ boolean result = stopped;
+ long doneTime = System.currentTimeMillis() + timeoutMillis;
+ Set procs = new HashSet();
+outer: do {
+ procs = collectProcessors(procs);
+ if (procs.isEmpty()) {
+ return true;
+ }
+ for (Processor p : procs) {
+ long remaining = doneTime - System.currentTimeMillis();
+ if (remaining <= 0) {
+ result = collectProcessors(procs).isEmpty();
+ break outer;
+ }
+ if (p.belongsTo(this)) {
+ p.join(remaining);
+ }
+ result = !p.isAlive() || !p.belongsTo(this);
+ }
+ procs.clear();
+ } while (!procs.isEmpty());
+ return result;
+ }
+
+ private Set collectProcessors (Set procs) {
+ procs.clear();
+ synchronized (processorLock) {
+ for (Processor p : processors) {
+ if (p.belongsTo(this)) {
+ procs.add(p);
+ }
+ }
+ }
+ return procs;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Note: If the passed {@link java.util.concurrent.Callable} implements
+ * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()}
+ * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked.
+ * If Cancellable.cancel()
returns false, then the job will not be
+ * cancelled.
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public Future submit(Callable task) {
+ Parameters.notNull("task", task); //NOI18N
+ if (stopped) {
+ throw new RejectedExecutionException("Request Processor already " + //NOI18N
+ "stopped"); //NOI18N
+ }
+ RPFutureTask result = new RPFutureTask(task);
+ Task t = create(result);
+ result.setTask(t);
+ t.schedule(0);
+ return result;
+ }
+ /**
+ * {@inheritDoc}
+ * Note: If the passed {@link java.lang.Runnable} implements
+ * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()}
+ * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked.
+ * If Cancellable.cancel()
returns false, then the job will not be
+ * cancelled.
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public Future submit(Runnable task, T predefinedResult) {
+ Parameters.notNull("task", task); //NOI18N
+ if (stopped) {
+ throw new RejectedExecutionException("Request Processor already " + //NOI18N
+ "stopped"); //NOI18N
+ }
+ RPFutureTask result = new RPFutureTask(task, predefinedResult);
+ Task t = create(result);
+ result.setTask(t);
+ t.schedule(0);
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Note: If the passed {@link java.lang.Runnable} implements
+ * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()}
+ * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked.
+ * If Cancellable.cancel()
returns false, then the job will not be
+ * cancelled.
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public Future> submit(Runnable task) {
+ return this.submit (task, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
+ Parameters.notNull("tasks", tasks); //NOI18N
+ List> result = new ArrayList>(tasks.size());
+ CountDownLatch wait = new CountDownLatch(tasks.size());
+ for (Callable c : tasks) {
+ if (c == null) {
+ throw new NullPointerException ("Contains null tasks: " + //NOI18N
+ tasks);
+ }
+ Callable delegate = new WaitableCallable(c, wait);
+ result.add (submit(delegate));
+ }
+ wait.await();
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Executes the given tasks, returning a list of Futures holding their
+ * status and results when all complete or the timeout expires, whichever
+ * happens first.
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ Parameters.notNull("unit", unit); //NOI18N
+ Parameters.notNull("tasks", tasks); //NOI18N
+ CountDownLatch wait = new CountDownLatch(tasks.size());
+ List> result = new ArrayList>(tasks.size());
+ for (Callable c : tasks) {
+ if (c == null) {
+ throw new NullPointerException ("Contains null tasks: " + tasks); //NOI18N
+ }
+ Callable delegate = new WaitableCallable(c, wait);
+ result.add (submit(delegate));
+ }
+ if (!wait.await(timeout, unit)) {
+ for (Future f : result) {
+ RPFutureTask> ft = (RPFutureTask>) f;
+ ft.cancel(true);
+ }
+ }
+ return result;
+ }
+ /**
+ * {@inheritDoc}
+ *
+ * Executes the given tasks, returning the result of one which has
+ * completed and cancelling any incomplete tasks.
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
+ Parameters.notNull("tasks", tasks); //NOI18N
+ CountDownLatch wait = new CountDownLatch(1);
+ List> result = new ArrayList>(tasks.size());
+ AtomicReference ref = new AtomicReference();
+ try {
+ for (Callable c : tasks) {
+ if (c == null) {
+ throw new NullPointerException ("Contains null tasks: " + //NOI18N
+ tasks);
+ }
+ Callable delegate = new WaitableCallable(c, ref, wait);
+ result.add (submit(delegate));
+ }
+ wait.await();
+ } finally {
+ for (Future f : result) {
+ RPFutureTask> ft = (RPFutureTask>) f;
+ ft.cancel(true);
+ }
+ }
+ return ref.get();
+ }
+ /**
+ * {@inheritDoc}
+ *
+ * Executes the given tasks, returning a list of Futures holding their
+ * status and results when all complete or the timeout expires, whichever
+ * happens first.
+ * @param The result type
+ * @param tasks A collection of callables
+ * @param timeout The maximum time to wait for completion, in the specified time units
+ * @param unit The time unit
+ * @return A list of futures
+ * @throws InterruptedException if the timeout expires or execution is interrupted
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ Parameters.notNull("unit", unit); //NOI18N
+ Parameters.notNull("tasks", tasks); //NOI18N
+ CountDownLatch wait = new CountDownLatch(1);
+ List> result = new ArrayList>(tasks.size());
+ AtomicReference ref = new AtomicReference();
+ try {
+ for (Callable c : tasks) {
+ if (c == null) {
+ throw new NullPointerException ("Contains null tasks: " + //NOI18N
+ tasks);
+ }
+ Callable delegate = new WaitableCallable(c, ref, wait);
+ result.add (submit(delegate));
+ }
+ wait.await(timeout, unit);
+ } finally {
+ for (Future f : result) {
+ RPFutureTask ft = (RPFutureTask) f;
+ ft.cancel(true);
+ }
+ }
+ return ref.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ Parameters.notNull("command", command); //NOI18N
+ Parameters.notNull("unit", unit); //NOI18N
+ if (delay < 0) {
+ throw new IllegalArgumentException ("Negative delay: " + delay);
+ }
+ if (stopped) {
+ throw new RejectedExecutionException("Request Processor already stopped"); //NOI18N
+ }
+ long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS);
+ ScheduledRPFutureTask result = new ScheduledRPFutureTask(command, null, delayMillis);
+ Task t = create(result);
+ result.setTask(t);
+ t.schedule(delayMillis);
+ return result;
+ }
+ /**
+ * {@inheritDoc}
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ Parameters.notNull("unit", unit); //NOI18N
+ Parameters.notNull("callable", callable); //NOI18N
+ if (delay < 0) {
+ throw new IllegalArgumentException ("Negative delay: " + delay);
+ }
+ if (stopped) {
+ throw new RejectedExecutionException("Request Processor already " + //NOI18N
+ "stopped"); //NOI18N
+ }
+ long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS);
+ ScheduledRPFutureTask result = new ScheduledRPFutureTask(callable, delayMillis);
+ Task t = create(result);
+ result.setTask(t);
+ t.schedule(delayMillis);
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Schedules a runnable which will run with a given frequency, regardless
+ * of how long execution takes, with the exception that if execution takes
+ * longer than the specified delay, execution will be delayed but will
+ * never be run on two threads concurrently.
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return scheduleFixed(command, initialDelay, period, unit, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Schedules a runnable which will run repeatedly after the specified initial
+ * delay, with the specified delay between the completion of one run and
+ * the start of the next.
+ * @since org.openide.util 8.2
+ */
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return scheduleFixed(command, initialDelay, delay, unit, true);
+ }
+
+ /**
+ * Create a Future which can be rescheduled and run multiple times. Such
+ * repeatable tasks are useful, for example, for cases where a GUI component
+ * should perform some background initialization when first shown, but
+ * should cancel that initialization if hidden; or cases where expensive
+ * background processing should be postponed on user input, but run after
+ * a certain amount of idle time.
+ *
+ * Note that this method creates a {@link ReschedulableFuture} —
+ * it does not enqueue it to run. To enqueue it to run, call
+ * the returned object's {@link ReschedulableFuture.schedule(long)} method.
+ *
+ * The returned Future may be cancelled, and then rescheduled to run
+ * later; if it's {@link ReschedulableFuture.schedule(long)} method is
+ * after it has been scheduled, but before it has run, it will run after
+ * the new timeout.
+ *
+ * The returned {@link ReschedulableFuture} will return false
+ * from isDone() until it has run for the first time.
+ *
+ * Any call to one of the schedule()
methods will reset the
+ * cancelled state to false.
+ *
+ * @param runnable A runnable
+ * @return A future which can be scheduled, cancelled and rescheduled as
+ * desired.
+ */
+ public ReschedulableFuture> createReschedulable(Runnable runnable) {
+ return createReschedulable(runnable, false);
+ }
+
+ /**
+ * Create a Future which can be rescheduled and run multiple times. Such
+ * repeatable tasks are useful, for example, for cases where a GUI component
+ * should perform some background initialization when first shown, but
+ * should cancel that initialization if hidden; or cases where expensive
+ * background processing should be postponed on user input, but run after
+ * a certain amount of idle time.
+ *
+ * Note that this method creates a {@link ReschedulableFuture} —
+ * it does not enqueue it to run. To enqueue it to run, call
+ * the returned object's {@link ReschedulableFuture.schedule(long)} method.
+ *
+ * The returned Future may be cancelled, and then rescheduled to run
+ * later; if it's {@link ReschedulableFuture.schedule(long)} method is
+ * after it has been scheduled, but before it has run, it will run after
+ * the new timeout.
+ *
+ * The returned {@link ReschedulableFuture} will return the value of the
+ * passed argument initiallyFinished
+ * from isDone()
until it has run for the first time.
+ *
+ * Any call to one of the schedule()
methods will reset the
+ * cancelled state to false.
+ *
+ * Calling get()
on the returned Future
will
+ * block until the next time the task completes, or until it is cancelled.
+ *
+ * @param runnable A runnable
+ * @param initiallyFinished If true, isDone()
will initially
+ * return true.
+ * @return A future which can be scheduled, cancelled and rescheduled as
+ * desired.
+ */
+ public ReschedulableFuture> createReschedulable(Runnable runnable, boolean initiallyFinished) {
+ Task task = create(runnable, initiallyFinished);
+ ReschedulableFutureImpl impl = new ReschedulableFutureImpl(runnable, task);
+ return impl;
+ }
+
+ private ScheduledFuture> scheduleFixed (Runnable command, long initialDelay, long period, TimeUnit unit, boolean fixedDelay) {
+ Parameters.notNull("unit", unit); //NOI18N
+ Parameters.notNull("command", command); //NOI18N
+ if (period < 0) {
+ throw new IllegalArgumentException ("Negative delay: " + period); //NOI18N
+ }
+ if (initialDelay < 0) {
+ throw new IllegalArgumentException ("Negative initialDelay: " //NOI18N
+ + initialDelay);
+ }
+ if (stopped) {
+ throw new RejectedExecutionException("Request Processor already " + //NOI18N
+ "stopped"); //NOI18N
+ }
+ long initialDelayMillis = unit.convert(initialDelay, TimeUnit.MILLISECONDS);
+ long periodMillis = unit.convert(period, TimeUnit.MILLISECONDS);
+
+ TaskFutureWrapper wrap = fixedDelay ?
+ new FixedDelayTask(command, initialDelayMillis, periodMillis) :
+ new FixedRateTask(command, initialDelay, periodMillis);
+ Task t = create(wrap);
+ wrap.t = t;
+ t.cancelled = wrap.cancelled;
+ t.schedule (initialDelayMillis);
+
+ return wrap;
+ }
+
+ private static abstract class TaskFutureWrapper implements ScheduledFuture, Runnable, RunnableWrapper {
+ volatile Task t;
+ protected final Runnable toRun;
+ protected final long initialDelay;
+ protected final long period;
+ final AtomicBoolean cancelled = new AtomicBoolean();
+ TaskFutureWrapper(Runnable run, long initialDelay, long period) {
+ this.toRun = run;
+ this.initialDelay = initialDelay;
+ this.period = period;
+ }
+
+ @Override
+ public final Runnable getRunnable() {
+ return toRun;
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ long other = o.getDelay(TimeUnit.MILLISECONDS);
+ long ours = getDelay(TimeUnit.MILLISECONDS);
+ //Might overflow on, say, ms compared to Long.MAX_VALUE, TimeUnit.DAYS
+ return (int) (ours - other);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean result = true;
+ if (toRun instanceof Cancellable) {
+ result = ((Cancellable) toRun).cancel();
+ }
+ if (result) {
+ //will invoke cancelled.set(true)
+ result = t.cancel(mayInterruptIfRunning);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ @Override
+ public boolean isDone() {
+ return cancelled.get() || t.isFinished();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ t.waitFinished();
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ long millis = unit.convert(timeout, TimeUnit.MILLISECONDS);
+ t.waitFinished(millis);
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ return null;
+ }
+ }
+
+ private static final class FixedRateTask extends TaskFutureWrapper {
+ private final Object runLock = new Object();
+ private final Object timeLock = new Object();
+ //must be accessed holding timeLock
+ private int runCount;
+ private long nextRunTime;
+ private long start = Long.MIN_VALUE;
+ volatile boolean firstRun = true;
+ FixedRateTask (Runnable run, long initialDelay, long period) {
+ super (run, initialDelay, period);
+ }
+
+ @Override
+ public void run() {
+ if (firstRun) {
+ synchronized (timeLock) {
+ start = System.currentTimeMillis();
+ firstRun = false;
+ }
+ }
+ try {
+ synchronized(runLock) {
+ toRun.run();
+ }
+ } catch (RuntimeException e) {
+ cancel(true);
+ throw e;
+ }
+ reschedule();
+ }
+
+ private void reschedule() {
+ //All access to nextRunTime & runCount under lock.
+ long interval;
+ synchronized (timeLock) {
+ nextRunTime = start + (initialDelay + period * runCount);
+ runCount++;
+ interval = Math.max(0, nextRunTime - System.currentTimeMillis());
+ }
+ boolean canContinue = !cancelled.get() && !Thread.currentThread().isInterrupted();
+ if (canContinue) {
+ t.schedule(interval);
+ }
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ if (isCancelled()) {
+ return Long.MAX_VALUE;
+ }
+ long delay;
+ synchronized (timeLock) {
+ delay = Math.min(0, nextRunTime - System.currentTimeMillis());
+ }
+ return unit.convert(delay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private static final class FixedDelayTask extends TaskFutureWrapper {
+ private volatile boolean firstRun = true;
+ private final AtomicLong nextRunTime = new AtomicLong();
+ FixedDelayTask(Runnable run, long initialDelay, long period) {
+ super (run, initialDelay, period);
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ long next = nextRunTime.get();
+ return unit.convert (next - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run() {
+ if (!fini()) {
+ toRun.run();
+ }
+ if (!fini()) {
+ reschedule();
+ }
+ }
+
+ private boolean fini() {
+ boolean result = cancelled.get() || Thread.currentThread().isInterrupted();
+ return result;
+ }
+
+ private void reschedule() {
+ long delay;
+ if (firstRun) {
+ delay = initialDelay;
+ } else {
+ delay = period;
+ }
+ nextRunTime.set(System.currentTimeMillis() + delay);
+ firstRun = false;
+ if (!fini()) {
+ t.schedule((int) delay);
+ }
+ }
+ }
+
+ private interface RunnableWrapper {
+ Runnable getRunnable();
+ }
+
+ private static final class WaitableCallable implements Callable, Cancellable {
+ private final CountDownLatch countdown;
+ private final Callable delegate;
+ private final AtomicReference ref;
+ private volatile boolean failed;
+ WaitableCallable(Callable delegate, CountDownLatch countdown) {
+ this (delegate, null, countdown);
+ }
+
+ WaitableCallable(Callable delegate, AtomicReference ref, CountDownLatch countdown) {
+ this.delegate = delegate;
+ this.countdown = countdown;
+ this.ref = ref;
+ }
+
+ boolean failed() {
+ return failed;
+ }
+
+ @Override
+ public T call() throws Exception {
+ try {
+ T result = delegate.call();
+ if (ref != null) {
+ ref.set(result);
+ }
+ return result;
+ } catch (RuntimeException e) {
+ failed = true;
+ throw e;
+ } catch (Error e) {
+ failed = true;
+ throw e;
+ } finally {
+ if (!failed || ref == null) {
+ countdown.countDown();
+ }
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ return delegate instanceof Cancellable ? ((Cancellable) delegate).cancel() : true;
+ }
+ }
+
+ private static class RPFutureTask extends FutureTask implements RunnableWrapper {
+ protected volatile Task task;
+ private final Runnable runnable;
+ private final Cancellable cancellable;
+ RPFutureTask(Callable c) {
+ super (c);
+ this.runnable = null;
+ this.cancellable = c instanceof Cancellable ? (Cancellable) c : null;
+ }
+
+ RPFutureTask(Runnable r, T result) {
+ super (r, result);
+ this.runnable = r;
+ this.cancellable = r instanceof Cancellable ? (Cancellable) r : null;
+ }
+
+ void setTask(Task task) {
+ this.task = task;
+ }
+
+ RPFutureTask(Callable c, T predefinedResult) {
+ this (c);
+ set(predefinedResult);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean result = cancellable == null ? true : cancellable.cancel();
+ if (result) {
+ boolean taskCancelled = task.cancel();
+ boolean superCancel = super.cancel(mayInterruptIfRunning); //must call both!
+ result = taskCancelled && superCancel;
+ }
+ return result;
+ }
+
+ @Override
+ public Runnable getRunnable() {
+ return this.runnable;
+ }
+ }
+
+ private static final class ScheduledRPFutureTask extends RPFutureTask implements ScheduledFuture {
+ protected final long delayMillis;
+ ScheduledRPFutureTask(Callable c, long delayMillis) {
+ super (c);
+ this.delayMillis = delayMillis;
+ }
+
+ ScheduledRPFutureTask(Runnable r, T result, long delayMillis) {
+ super (r, result);
+ this.delayMillis = delayMillis;
+ }
+
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return TimeUnit.MILLISECONDS.convert(delayMillis, unit);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ //Can overflow, if one delay is, say, days, and the other, microseconds
+ long otherDelayMillis = o.getDelay(TimeUnit.MILLISECONDS);
+ return (int) (delayMillis - otherDelayMillis);
+ }
+ }
+
private class EnqueueTask extends TimerTask {
Item itm;
@@ -610,6 +1389,7 @@
private int priority = Thread.MIN_PRIORITY;
private long time = 0;
private Thread lastThread = null;
+ private AtomicBoolean cancelled;
/** @param run runnable to start
* @param delay amount of millis to wait
@@ -676,6 +1456,10 @@
* @param delay time in milliseconds to wait (starting from now)
*/
public void schedule(int delay) {
+ schedule((long) delay);
+ }
+
+ void schedule(long delay) {
if (stopped) {
throw new IllegalStateException("RequestProcessor already stopped!"); // NOI18N
}
@@ -685,6 +1469,9 @@
final Item localItem;
synchronized (processorLock) {
+ if (cancelled != null) {
+ cancelled.set(false);
+ }
notifyRunning();
if (item != null) {
@@ -747,6 +1534,49 @@
}
}
+ /**
+ * Implementation of cancel for use with Future objects, to guarantee
+ * that the thread will be interrupted, no matter what the setting
+ * on the owning RP.
+ * @param interrupt If true, the thread should be interrupted
+ * @return true if cancellation occurred
+ */
+ boolean cancel (boolean interrupt) {
+ synchronized (processorLock) {
+ if (cancelled != null) {
+ boolean wasCancelled = !cancelled.getAndSet(true);
+ if (wasCancelled) {
+ return false;
+ }
+ }
+ boolean success;
+
+ if (item == null) {
+ success = false;
+ } else {
+ Processor p = item.getProcessor();
+ success = item.clear(null);
+
+ if (p != null) {
+ if (interrupt) {
+ success = p.interrupt(this, RequestProcessor.this);
+ } else {
+ //despite its name, will not actually interrupt
+ //unless the RP specifies that it should
+ p.interruptTask(this, RequestProcessor.this);
+ }
+ if (success) {
+ item = null;
+ }
+ }
+ }
+ if (success) {
+ notifyFinished(); // mark it as finished
+ }
+ return success;
+ }
+ }
+
/** Current priority of the task.
* @return the priority level (see e.g. {@link Thread#NORM_PRIORITY}
*/
@@ -1062,6 +1892,12 @@
}
}
+ boolean belongsTo(RequestProcessor r) {
+ synchronized (lock) {
+ return source == r;
+ }
+ }
+
/**
* The method that will repeatedly wait for a request and perform it.
*/
@@ -1190,6 +2026,14 @@
}
}
+ boolean interrupt (Task t, RequestProcessor src) {
+ if (t != todo) {
+ return false;
+ }
+ interrupt();
+ return true;
+ }
+
/** @see "#20467" */
private static void doNotify(RequestProcessor.Task todo, Throwable ex) {
if (SLOW && todo.item.message == null) {
diff --git a/openide.util/src/org/openide/util/ReschedulableFuture.java b/openide.util/src/org/openide/util/ReschedulableFuture.java
new file mode 100644
--- /dev/null
+++ b/openide.util/src/org/openide/util/ReschedulableFuture.java
@@ -0,0 +1,75 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ *
+ * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
+ *
+ * The contents of this file are subject to the terms of either the GNU
+ * General Public License Version 2 only ("GPL") or the Common
+ * Development and Distribution License("CDDL") (collectively, the
+ * "License"). You may not use this file except in compliance with the
+ * License. You can obtain a copy of the License at
+ * http://www.netbeans.org/cddl-gplv2.html
+ * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
+ * specific language governing permissions and limitations under the
+ * License. When distributing the software, include this License Header
+ * Notice in each file and include the License file at
+ * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the GPL Version 2 section of the License file that
+ * accompanied this code. If applicable, add the following below the
+ * License Header, with the fields enclosed by brackets [] replaced by
+ * your own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * If you wish your version of this file to be governed by only the CDDL
+ * or only the GPL Version 2, indicate your decision by adding
+ * "[Contributor] elects to include this software in this distribution
+ * under the [CDDL or GPL Version 2] license." If you do not indicate a
+ * single choice of license, a recipient has the option to distribute
+ * your version of this file under either the CDDL, the GPL Version 2 or
+ * to extend the choice of license to its licensees as provided above.
+ * However, if you add GPL Version 2 code and therefore, elected the GPL
+ * Version 2 license, then the option applies only if the new code is
+ * made subject to such option by the copyright holder.
+ *
+ * Contributor(s):
+ *
+ * Portions Copyrighted 2010 Sun Microsystems, Inc.
+ */
+
+package org.openide.util;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Future which can be run more than once.
+ *
+ * @author Tim Boudreau
+ */
+public interface ReschedulableFuture extends ScheduledFuture{
+ /**
+ * Schedule this future to be run. If this task is already enqueued to run
+ * after some delay, the task will now run after the passed delay, regardless
+ * of whether it is sooner or later than the original delay. If this
+ * task has been cancelled, it is re-enqueued.
+ *
+ * @param delay The delay
+ * @param unit The time unit of the delay
+ * @return A ScheduledFuture (which may not == this) which can be
+ * monitored
+ */
+ ScheduledFuture schedule (long delay, TimeUnit unit);
+ /**
+ * Schedule this future to be run. If this task is already enqueued to run
+ * after some delay, the task will now run after the passed delay, regardless
+ * of whether it is sooner or later than the original delay. If this
+ * task has been cancelled, it is re-enqueued.
+ *
+ * @param delayMillis The delay in milliseconds
+ * @param unit The time unit of the delay
+ * @return A ScheduledFuture (which may not == this) which can be
+ * monitored
+ */
+ ScheduledFuture schedule (long delayMillis);
+}
diff --git a/openide.util/src/org/openide/util/ReschedulableFutureImpl.java b/openide.util/src/org/openide/util/ReschedulableFutureImpl.java
new file mode 100644
--- /dev/null
+++ b/openide.util/src/org/openide/util/ReschedulableFutureImpl.java
@@ -0,0 +1,131 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ *
+ * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
+ *
+ * The contents of this file are subject to the terms of either the GNU
+ * General Public License Version 2 only ("GPL") or the Common
+ * Development and Distribution License("CDDL") (collectively, the
+ * "License"). You may not use this file except in compliance with the
+ * License. You can obtain a copy of the License at
+ * http://www.netbeans.org/cddl-gplv2.html
+ * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
+ * specific language governing permissions and limitations under the
+ * License. When distributing the software, include this License Header
+ * Notice in each file and include the License file at
+ * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the GPL Version 2 section of the License file that
+ * accompanied this code. If applicable, add the following below the
+ * License Header, with the fields enclosed by brackets [] replaced by
+ * your own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * If you wish your version of this file to be governed by only the CDDL
+ * or only the GPL Version 2, indicate your decision by adding
+ * "[Contributor] elects to include this software in this distribution
+ * under the [CDDL or GPL Version 2] license." If you do not indicate a
+ * single choice of license, a recipient has the option to distribute
+ * your version of this file under either the CDDL, the GPL Version 2 or
+ * to extend the choice of license to its licensees as provided above.
+ * However, if you add GPL Version 2 code and therefore, elected the GPL
+ * Version 2 license, then the option applies only if the new code is
+ * made subject to such option by the copyright holder.
+ *
+ * Contributor(s):
+ *
+ * Portions Copyrighted 2010 Sun Microsystems, Inc.
+ */
+
+package org.openide.util;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ * @author Tim Boudreau
+ */
+final class ReschedulableFutureImpl implements ReschedulableFuture {
+ private final RequestProcessor.Task task;
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+ private final Runnable toRun;
+ ReschedulableFutureImpl(Runnable run, RequestProcessor.Task task) {
+ this.task = task;
+ this.toRun = run;
+ }
+
+ @Override
+ public ScheduledFuture schedule(long delay, TimeUnit unit) {
+ schedule(unit.convert(delay, TimeUnit.MILLISECONDS));
+ return this;
+ }
+
+ @Override
+ public ScheduledFuture schedule(long delayMillis) {
+ cancelled.set(false);
+ task.schedule(delayMillis);
+ return this;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert (task.getDelay(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ long val = getDelay(TimeUnit.MILLISECONDS);
+ long other = o.getDelay(TimeUnit.MILLISECONDS);
+ return (int)(val - other);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (cancelled.compareAndSet(false, true)) {
+ boolean jobCancelled = toRun instanceof Cancellable ? ((Cancellable) toRun).cancel() : true;
+ boolean taskCancelled = task.cancel(mayInterruptIfRunning);
+ return jobCancelled && taskCancelled;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ @Override
+ public boolean isDone() {
+ return cancelled.get() || task.isFinished();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ task.waitFinished();
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ task.waitFinished(unit.convert(timeout, TimeUnit.MILLISECONDS));
+ if (cancelled.get()) {
+ throw new CancellationException();
+ }
+ return null;
+ }
+}
diff --git a/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java b/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java
new file mode 100644
--- /dev/null
+++ b/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java
@@ -0,0 +1,1500 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ *
+ * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
+ *
+ * The contents of this file are subject to the terms of either the GNU
+ * General Public License Version 2 only ("GPL") or the Common
+ * Development and Distribution License("CDDL") (collectively, the
+ * "License"). You may not use this file except in compliance with the
+ * License. You can obtain a copy of the License at
+ * http://www.netbeans.org/cddl-gplv2.html
+ * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
+ * specific language governing permissions and limitations under the
+ * License. When distributing the software, include this License Header
+ * Notice in each file and include the License file at
+ * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the GPL Version 2 section of the License file that
+ * accompanied this code. If applicable, add the following below the
+ * License Header, with the fields enclosed by brackets [] replaced by
+ * your own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * If you wish your version of this file to be governed by only the CDDL
+ * or only the GPL Version 2, indicate your decision by adding
+ * "[Contributor] elects to include this software in this distribution
+ * under the [CDDL or GPL Version 2] license." If you do not indicate a
+ * single choice of license, a recipient has the option to distribute
+ * your version of this file under either the CDDL, the GPL Version 2 or
+ * to extend the choice of license to its licensees as provided above.
+ * However, if you add GPL Version 2 code and therefore, elected the GPL
+ * Version 2 license, then the option applies only if the new code is
+ * made subject to such option by the copyright holder.
+ *
+ * Contributor(s):
+ *
+ * Portions Copyrighted 2010 Sun Microsystems, Inc.
+ */
+package org.openide.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import junit.framework.Test;
+import org.netbeans.junit.NbTestCase;
+import org.netbeans.junit.NbTestSuite;
+import org.netbeans.junit.RandomlyFails;
+
+/**
+ *
+ * @author Tim Boudreau
+ */
+public class RequestProcessor180386Test extends NbTestCase {
+
+ public RequestProcessor180386Test(java.lang.String testName) {
+ super(testName);
+ }
+
+ public static Test suite() {
+ Test t = null;
+ if (t == null) {
+ t = new NbTestSuite(RequestProcessor180386Test.class);
+ }
+ return t;
+ }
+
+ public void testSubmit() throws Exception {
+ class C implements Callable {
+
+ volatile boolean hasRun;
+
+ @Override
+ public String call() throws Exception {
+ String result = "Hello";
+ hasRun = true;
+ return result;
+ }
+ }
+ C c = new C();
+ Future f = RequestProcessor.getDefault().submit(c);
+ assertEquals("Hello", f.get());
+ assertTrue(c.hasRun);
+
+ class R implements Runnable {
+
+ volatile boolean hasRun;
+
+ @Override
+ public void run() {
+ hasRun = true;
+ }
+ }
+ R r = new R();
+ f = RequestProcessor.getDefault().submit(r, "Goodbye");
+ assertEquals("Goodbye", f.get());
+ assertTrue(r.hasRun);
+ }
+
+ public void testSomeTasksNotRunIfShutDown() throws Exception {
+ final Object lock = new Object();
+ int count = 10;
+ final CountDownLatch waitAllLaunched = new CountDownLatch(count);
+ final CountDownLatch waitOneFinished = new CountDownLatch(1);
+ RequestProcessor rp = new RequestProcessor("TestRP", count * 2);
+ class R implements Runnable {
+
+ volatile boolean hasStarted;
+ volatile boolean hasFinished;
+
+ @Override
+ public void run() {
+ hasStarted = true;
+ waitAllLaunched.countDown();
+ synchronized (lock) {
+ try {
+ lock.wait();
+ if (Thread.interrupted()) {
+ return;
+ }
+ } catch (InterruptedException ex) {
+ return;
+ } finally {
+ waitOneFinished.countDown();
+ }
+ hasFinished = true;
+ }
+ }
+ }
+ Set> s = new HashSet>();
+ Set rs = new HashSet();
+ for (int i = 0; i < count; i++) {
+ String currName = "Runnable " + i;
+ R r = new R();
+ rs.add(r);
+ s.add(rp.submit(r, currName));
+ }
+ waitAllLaunched.await();
+ synchronized (lock) {
+ //Notify just one thread
+ lock.notify();
+ }
+ waitOneFinished.await();
+ List notRun = rp.shutdownNow();
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ boolean allFinished = true;
+ int finishedCount = 0;
+ for (R r : rs) {
+ assertTrue(r.hasStarted);
+ allFinished &= r.hasFinished;
+ if (r.hasFinished) {
+ finishedCount++;
+ }
+ }
+ assertFalse("All tasks should not have completed", allFinished);
+ assertTrue("At least one task shall complete", finishedCount >= 1);
+ assertFalse(notRun.isEmpty());
+ assertTrue(rp.isShutdown());
+ //Technically not provable due to "spurious wakeups"
+ // assertEquals (1, finishedCount);
+
+ try {
+ RequestProcessor.getDefault().shutdown();
+ fail("Should not be able to shutdown() default RP");
+ } catch (Exception e) {
+ }
+ try {
+ RequestProcessor.getDefault().shutdownNow();
+ fail("Should not be able to shutdownNow() default RP");
+ } catch (Exception e) {
+ }
+ for (Runnable nr : notRun) {
+ assertTrue ("Shutdown is not returning submitted runnables - got a "
+ + nr.getClass().getName() + " instead of " +
+ R.class.getName(), nr.getClass() == R.class);
+ }
+ }
+
+ public void testAwaitTermination() throws Exception {
+ int count = 20;
+ final Object lock = new Object();
+ final CountDownLatch waitAllLaunched = new CountDownLatch(count);
+ final CountDownLatch waitAll = new CountDownLatch(count);
+ final RequestProcessor rp = new RequestProcessor("TestRP", count);
+ class R implements Runnable {
+
+ volatile boolean hasStarted;
+ volatile boolean hasFinished;
+
+ @Override
+ public void run() {
+ hasStarted = true;
+ waitAllLaunched.countDown();
+ synchronized (lock) {
+ try {
+ lock.wait();
+ if (Thread.interrupted()) {
+ return;
+ }
+ } catch (InterruptedException ex) {
+ return;
+ } finally {
+ hasFinished = true;
+ waitAll.countDown();
+ }
+ }
+ }
+ }
+ Set> s = new HashSet>();
+ Set rs = new HashSet();
+ for (int i = 0; i < count; i++) {
+ String currName = "Runnable " + i;
+ R r = new R();
+ rs.add(r);
+ s.add(rp.submit(r, currName));
+ }
+ waitAllLaunched.await();
+ synchronized (lock) {
+ //Notify just one thread
+ lock.notifyAll();
+ }
+ rp.shutdown();
+ boolean awaitTermination = rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ assertTrue(awaitTermination);
+ assertTrue(rp.isShutdown());
+ assertTrue(rp.isTerminated());
+ }
+
+ @RandomlyFails
+ public void testAwaitTerminationWaitsForNewlyAddedThreads() throws Exception {
+ final RequestProcessor rp = new RequestProcessor("testAwaitTerminationWaitsForNewlyAddedThreads", 50, false);
+ int count = 30;
+ final CountDownLatch waitLock = new CountDownLatch(1);
+ class R implements Runnable {
+ boolean done;
+ @Override
+ public void run() {
+ try {
+ waitLock.await();
+ } catch (InterruptedException ex) {
+ done = true;
+ } finally {
+ done = true;
+ }
+ }
+ }
+ Set rs = new HashSet();
+ for (int i= 0; i < count; i++) {
+ R r = new R();
+ rs.add(r);
+ rp.submit(r);
+ }
+ final CountDownLatch shutdownBegun = new CountDownLatch(1);
+ Runnable shutdowner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ rp.shutdown();
+ shutdownBegun.countDown();
+ } catch (InterruptedException ex) {
+ Exceptions.printStackTrace(ex);
+ }
+ }
+ };
+ waitLock.countDown();
+ new Thread(shutdowner).start();
+ assertTrue(rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+ Thread.sleep (600);
+ assertTrue (rp.isTerminated());
+ }
+
+ public void testInvokeAll() throws Exception {
+ int count = 20;
+ final CountDownLatch waitAll = new CountDownLatch(count);
+ final RequestProcessor rp = new RequestProcessor("TestRP", count);
+ try {
+ class C implements Callable {
+
+ private final String result;
+ volatile boolean ran;
+
+ C(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ ran = true;
+ waitAll.countDown();
+ return result;
+ }
+ }
+ List callables = new ArrayList(count);
+ List> fs;
+ Set names = new HashSet(count);
+ for (int i = 0; i < count; i++) {
+ String name = "R" + i;
+ names.add(name);
+ C c = new C(name);
+ callables.add(c);
+ }
+ fs = rp.invokeAll(callables);
+
+ assertNotNull(fs);
+ waitAll.await();
+ assertEquals(0, waitAll.getCount());
+ for (Future f : fs) {
+ assertTrue (f.isDone());
+ }
+ for (C c : callables) {
+ assertTrue (c.ran);
+ }
+ Set s = new HashSet(count);
+ for (Future f : fs) {
+ s.add(f.get());
+ }
+ assertEquals(names, s);
+ } finally {
+ rp.stop();
+ }
+ }
+
+ public void testInvokeAllWithTimeout() throws Exception {
+ int count = 20;
+ final CountDownLatch blocker = new CountDownLatch(1);
+ final RequestProcessor rp = new RequestProcessor("TestRP", count);
+ try {
+ class C implements Callable {
+ volatile boolean iAmSpecial;
+
+ private final String result;
+ volatile boolean ran;
+
+ C(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ //Only one will be allowed to run, the rest
+ //will be cancelled
+ if (!iAmSpecial) {
+ blocker.await();
+ }
+ ran = true;
+ return result;
+ }
+ }
+ List callables = new ArrayList(count);
+ C special = new C("Special");
+ special.iAmSpecial = true;
+ callables.add(special);
+ List> fs;
+ Set names = new HashSet(count);
+ for (int i = 0; i < count; i++) {
+ String name = "R" + i;
+ names.add(name);
+ C c = new C(name);
+ callables.add(c);
+ }
+ fs = rp.invokeAll(callables, 1000, TimeUnit.MILLISECONDS);
+ assertNotNull(fs);
+ for (Future f : fs) {
+ assertTrue (f.isDone());
+ }
+ for (C c : callables) {
+ if (c == special) {
+ assertTrue (c.ran);
+ } else {
+ assertFalse(c.ran);
+ }
+ }
+ } finally {
+ rp.stop();
+ }
+ }
+
+ public void testInvokeAllSingleThread() throws Exception {
+ int count = 20;
+ final CountDownLatch waitAll = new CountDownLatch(count);
+ final RequestProcessor rp = new RequestProcessor("TestRP", 1);
+ class C implements Callable {
+
+ private final String result;
+
+ C(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ waitAll.countDown();
+ return result;
+ }
+ }
+ List l = new ArrayList(count);
+ List> fs;
+ Set names = new HashSet(count);
+ for (int i = 0; i < count; i++) {
+ String name = "R" + i;
+ names.add(name);
+ C c = new C(name);
+ l.add(c);
+ }
+ fs = rp.invokeAll(l);
+ assertNotNull(fs);
+ Set s = new HashSet(count);
+ for (Future f : fs) {
+ s.add(f.get());
+ }
+ assertEquals(names, s);
+ }
+
+ public void testInvokeAny() throws Exception {
+ int count = 20;
+ final RequestProcessor rp = new RequestProcessor("TestRP", count + 1);
+ class C implements Callable {
+
+ private final String result;
+
+ C(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ if (Thread.interrupted()) {
+ return null;
+ }
+ return result;
+ }
+ }
+ List l = new ArrayList(count);
+ Set names = new HashSet(count);
+ for (int i = 0; i < count; i++) {
+ String name = "R" + i;
+ names.add(name);
+ C c = new C(name);
+ l.add(c);
+ }
+ String res = rp.invokeAny(l);
+ assertNotNull(res);
+ assertTrue(res.startsWith("R"));
+ }
+
+ public void testInvokeAnySingleThread() throws Exception {
+ int count = 1000;
+ final RequestProcessor rp = new RequestProcessor("TestRP", 20);
+ final CountDownLatch latch = new CountDownLatch(count);
+ class C implements Callable {
+
+ volatile boolean hasRun;
+ private final String result;
+
+ C(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ latch.countDown();
+ if (!"R17".equals(result)) {
+ try {
+ //Block all but one thread until threads have entered
+ latch.await();
+ } catch (InterruptedException ie) {
+ }
+ }
+ Thread.yield();
+ hasRun = true;
+ return result;
+ }
+ }
+ List l = new ArrayList(count);
+ Set names = new HashSet(count);
+ for (int i = 0; i < count; i++) {
+ String name = "R" + i;
+ names.add(name);
+ C c = new C(name);
+ l.add(c);
+ }
+ String res = rp.invokeAny(l);
+ assertNotNull(res);
+ assertTrue(res.startsWith("R"));
+ int runCount = 0;
+ for (C c : l) {
+ if (c.hasRun) {
+ runCount++;
+ }
+ }
+ assertTrue("Not all " + count + " threads should have completed, but " + runCount + " did.", runCount < count);
+ }
+
+ public void testInvokeAnyWithTimeout() throws Exception {
+ int count = 20;
+ final RequestProcessor rp = new RequestProcessor("TestRP", count + 1);
+ final CountDownLatch latch = new CountDownLatch(1);
+ class C implements Callable {
+
+ volatile boolean hasRun;
+ private final String result;
+
+ C(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ latch.await();
+ if (Thread.interrupted()) {
+ return null;
+ }
+ hasRun = true;
+ return result;
+ }
+ }
+ List l = new ArrayList(count);
+ Set names = new HashSet(count);
+ for (int i = 0; i < count; i++) {
+ String name = "R" + i;
+ names.add(name);
+ C c = new C(name);
+ l.add(c);
+ }
+ //All threads are waiting on latch; we should time out
+ String res = rp.invokeAny(l, 400, TimeUnit.MILLISECONDS);
+ assertNull(res);
+ for (C c : l) {
+ assertFalse(c.hasRun);
+ }
+ }
+
+ public void testCancellation() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ class C implements Callable {
+
+ volatile boolean hasRun;
+ volatile boolean interrupted;
+
+ @Override
+ public String call() throws Exception {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ return null;
+ }
+ if (Thread.interrupted()) {
+ interrupted = true;
+ return null;
+ }
+ hasRun = true;
+ return "Hello";
+ }
+ }
+ C c = new C();
+ Future f = RequestProcessor.getDefault().submit(c);
+ f.cancel(true);
+ latch.countDown();
+ String s = null;
+ try {
+ s = f.get();
+ fail("CancellationException should have been thrown");
+ } catch (CancellationException e) {
+ }
+ assertNull(s);
+ assertTrue(c.interrupted || !c.hasRun);
+ assertFalse(c.hasRun);
+ }
+
+ public void testCancellablesGetCancelInvokedWithCallable() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch exit = new CountDownLatch(1);
+ class C implements Callable, Cancellable {
+
+ volatile boolean hasRun;
+ volatile boolean interrupted;
+ volatile boolean cancelled;
+
+ @Override
+ public String call() throws Exception {
+ try {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ return null;
+ }
+ if (Thread.interrupted()) {
+ interrupted = true;
+ return null;
+ }
+ if (cancelled) {
+ return null;
+ }
+ hasRun = true;
+ return "Hello";
+ } finally {
+ exit.countDown();
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ cancelled = true;
+ exit.countDown();
+ return true;
+ }
+ }
+ C c = new C();
+ Future f = RequestProcessor.getDefault().submit(c);
+ f.cancel(true);
+ assertTrue (c.cancelled);
+ latch.countDown();
+ exit.await();
+ String s = null;
+ try {
+ s = f.get();
+ fail ("Should have gotten cancellation exception");
+ } catch (CancellationException e) {
+
+ }
+ }
+
+ public void testCancellablesGetCancelInvokedWithRunnable() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch exit = new CountDownLatch(1);
+ class C implements Runnable, Cancellable {
+
+ volatile boolean hasRun;
+ volatile boolean interrupted;
+ volatile boolean cancelled;
+
+ @Override
+ public void run() {
+ try {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ return;
+ }
+ if (Thread.interrupted()) {
+ interrupted = true;
+ return;
+ }
+ if (cancelled) {
+ return;
+ }
+ hasRun = true;
+ } finally {
+ exit.countDown();
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ cancelled = true;
+ exit.countDown();
+ return true;
+ }
+ }
+ C c = new C();
+ Future> f = RequestProcessor.getDefault().submit(c);
+ f.cancel(true);
+ assertTrue (c.cancelled);
+ latch.countDown();
+ exit.await();
+ try {
+ f.get();
+ fail ("Should have gotten cancellation exception");
+ } catch (CancellationException e) {
+
+ }
+ assertFalse (c.hasRun);
+ }
+
+ public void testCancellablesThatSayTheyCantBeCancelledAreNotCancelledViaFutureDotCancel() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch exit = new CountDownLatch(1);
+ class C implements Runnable, Cancellable {
+
+ volatile boolean hasRun;
+ volatile boolean interrupted;
+ volatile boolean cancelCalled;
+
+ @Override
+ public void run() {
+ try {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ throw new AssertionError(e);
+ }
+ if (Thread.interrupted()) {
+ interrupted = true;
+ throw new AssertionError("Thread should not have been interrupted");
+ }
+ hasRun = true;
+ } finally {
+ exit.countDown();
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ cancelCalled = true;
+ return false;
+ }
+ }
+ C c = new C();
+ Future> f = RequestProcessor.getDefault().submit(c);
+ f.cancel(true);
+ assertFalse (f.isCancelled());
+ assertTrue (c.cancelCalled);
+ latch.countDown();
+ exit.await();
+ f.get();
+ assertFalse (f.isCancelled());
+ assertTrue (c.hasRun);
+ }
+
+ public void testInvokeAllCancellation() throws Exception {
+ int count = 20;
+ final CountDownLatch waitAll = new CountDownLatch(count);
+ final RequestProcessor rp = new RequestProcessor("TestRP", count);
+ class C implements Callable, Cancellable {
+
+ private final String result;
+ volatile boolean cancelCalled;
+
+ C(String result) {
+ this.result = result;
+ }
+
+ @Override
+ public String call() throws Exception {
+ waitAll.countDown();
+ return cancelCalled ? null : result;
+ }
+
+ @Override
+ public boolean cancel() {
+ cancelCalled = true;
+ return false;
+ }
+ }
+ List l = new ArrayList(count);
+ List> fs;
+ Set names = new HashSet(count);
+ for (int i = 0; i < count; i++) {
+ String name = "R" + i;
+ names.add(name);
+ C c = new C(name);
+ l.add(c);
+ }
+ fs = rp.invokeAll(l);
+ assertNotNull(fs);
+ Set s = new HashSet(count);
+ for (Future f : fs) {
+ s.add(f.get());
+ }
+ assertEquals(names, s);
+ }
+
+ public void testCannotScheduleLongerThanIntegerMaxValue() throws Exception {
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ fail ("Should not have been run");
+ }
+ };
+ try {
+ Future> f = RequestProcessor.getDefault().schedule(r, Long.MAX_VALUE, TimeUnit.DAYS);
+ f.cancel(true);
+ } catch (Exception e) {}
+ }
+
+ public void testCannotScheduleNegativeDelay() throws Exception {
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ fail ("Should not have been run");
+ }
+ };
+ try {
+ RequestProcessor.getDefault().schedule(r, -1L, TimeUnit.MILLISECONDS);
+ fail ("Negative value accepetd");
+ } catch (Exception e) {}
+ try {
+ RequestProcessor.getDefault().scheduleAtFixedRate(r, -1L, 22L, TimeUnit.MILLISECONDS);
+ fail ("Negative value accepetd");
+ } catch (Exception e) {}
+ try {
+ RequestProcessor.getDefault().scheduleAtFixedRate(r, 200, -22L, TimeUnit.MILLISECONDS);
+ fail ("Negative value accepetd");
+ } catch (Exception e) {}
+ try {
+ RequestProcessor.getDefault().scheduleWithFixedDelay(r, -1L, 22L, TimeUnit.MILLISECONDS);
+ fail ("Negative value accepetd");
+ } catch (Exception e) {}
+ try {
+ RequestProcessor.getDefault().scheduleWithFixedDelay(r, 1L, -22L, TimeUnit.MILLISECONDS);
+ fail ("Negative value accepetd");
+ } catch (Exception e) {}
+ }
+
+ public void testTaskCanRescheduleItself() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(2);
+ class R implements Runnable {
+ volatile RequestProcessor.Task task;
+ volatile int runCount;
+ @Override
+ public void run() {
+ runCount++;
+ if (runCount == 1) {
+ task.schedule(0);
+ }
+ latch.countDown();
+ }
+ }
+ R r = new R();
+ RequestProcessor.Task t = RequestProcessor.getDefault().create(r);
+ r.task = t;
+ t.schedule(0);
+ latch.await ();
+ assertEquals (r.runCount, 2);
+ }
+
+ public void testScheduleRepeatingSanityFixedRate() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(5);
+ class C implements Runnable {
+ volatile int runCount;
+ @Override
+ public void run() {
+ runCount++;
+ latch.countDown();
+ }
+ }
+ C c = new C();
+ RequestProcessor.getDefault().scheduleWithFixedDelay(c, 0, 20, TimeUnit.MILLISECONDS);
+// latch.await(5000, TimeUnit.MILLISECONDS);
+ latch.await();
+ assertEquals (5, c.runCount);
+ }
+
+ public void testScheduleRepeatingSanityFixedDelay() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(5);
+ class C implements Runnable {
+ volatile int runCount;
+ @Override
+ public void run() {
+ runCount++;
+ latch.countDown();
+ }
+ }
+ C c = new C();
+ RequestProcessor.getDefault().scheduleAtFixedRate(c, 0, 20, TimeUnit.MILLISECONDS);
+ latch.await(2000, TimeUnit.MILLISECONDS);
+
+ assertEquals (5, c.runCount);
+ }
+
+ public void testScheduleOneShot() throws Exception {
+ RequestProcessor rp = new RequestProcessor ("testScheduleOneShot", 5, true, true);
+ try {
+ class C implements Callable {
+ volatile long start = System.currentTimeMillis();
+ private volatile long end;
+
+ @Override
+ public String call() throws Exception {
+ synchronized(this) {
+ end = System.currentTimeMillis();
+ }
+ return "Hello";
+ }
+
+ synchronized long elapsed() {
+ return end - start;
+ }
+ }
+ C c = new C();
+ int delay = 5000;
+ //Use a 20 second timeout to have a reasonable chance of accuracy
+ ScheduledFuture f = rp.schedule(c, delay, TimeUnit.MILLISECONDS);
+ assertEquals (5000, f.getDelay(TimeUnit.MILLISECONDS));
+ assertNotNull(f.get());
+ //Allow 4 seconds fudge-factor
+ assertTrue (c.elapsed() > 4600);
+ assertTrue (f.isDone());
+ } finally {
+ rp.stop();
+ }
+ }
+
+ public void testScheduleRepeatingIntervalsAreRoughlyCorrect() throws Exception {
+ int runCount = 5;
+ final CountDownLatch latch = new CountDownLatch(runCount);
+ final List intervals = Collections.synchronizedList(new ArrayList (runCount));
+// long initialDelay = 30000;
+// long period = 20000;
+// long fudgeFactor = 4000;
+ long initialDelay = 3000;
+ long period = 2000;
+ long fudgeFactor = 400;
+ long expectedInitialDelay = initialDelay - fudgeFactor;
+ long expectedPeriod = period - fudgeFactor;
+ class C implements Runnable {
+ volatile long start = System.currentTimeMillis();
+ private int runCount;
+ @Override
+ public void run() {
+ runCount++;
+ try {
+ synchronized(this) {
+ long end = System.currentTimeMillis();
+ intervals.add (end - start);
+ start = end;
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+ }
+ C c = new C();
+ RequestProcessor rp = new RequestProcessor ("testScheduleRepeating", 5, true);
+ try {
+ Future> f = rp.scheduleWithFixedDelay(c, initialDelay, period, TimeUnit.MILLISECONDS);
+ // latch.await(initialDelay + fudgeFactor + ((runCount - 1) * (period + fudgeFactor)), TimeUnit.MILLISECONDS); //XXX
+ latch.await();
+ f.cancel(true);
+ for (int i= 0; i < Math.min(runCount, intervals.size()); i++) {
+ long expect = i == 0 ? expectedInitialDelay : expectedPeriod;
+ assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect);
+ }
+ //Ensure we have really exited
+ try {
+ f.get();
+ fail ("CancellationException should have been thrown");
+ } catch (CancellationException e) {}
+ assertTrue(f.isCancelled());
+ assertTrue(f.isDone());
+ } finally {
+ rp.stop();
+ }
+ }
+
+ public void testScheduleFixedRateAreRoughlyCorrect() throws Exception {
+ int runCount = 5;
+ final CountDownLatch latch = new CountDownLatch(runCount);
+ final List intervals = Collections.synchronizedList(new ArrayList (runCount));
+// long initialDelay = 30000;
+// long period = 20000;
+// long fudgeFactor = 4000;
+ long initialDelay = 3000;
+ long period = 2000;
+ long fudgeFactor = 400;
+ long expectedInitialDelay = initialDelay - fudgeFactor;
+ long expectedPeriod = period - fudgeFactor;
+ class C implements Runnable {
+ volatile long start = System.currentTimeMillis();
+ private int runCount;
+ @Override
+ public void run() {
+ runCount++;
+ try {
+ synchronized(this) {
+ long end = System.currentTimeMillis();
+ intervals.add (end - start);
+ start = end;
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+ }
+ C c = new C();
+ RequestProcessor rp = new RequestProcessor ("testScheduleFixedRateAreRoughlyCorrect", 5, true);
+ try {
+ Future> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
+ latch.await();
+ f.cancel(true);
+ for (int i= 0; i < Math.min(runCount, intervals.size()); i++) {
+ long expect = i == 0 ? expectedInitialDelay : expectedPeriod;
+ assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect);
+ }
+ //Ensure we have really exited
+ try {
+ f.get();
+ fail ("CancellationException should have been thrown");
+ } catch (CancellationException e) {}
+ assertTrue(f.isCancelled());
+ assertTrue(f.isDone());
+ } finally {
+ rp.stop();
+ }
+ }
+
+ public void testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution() throws Exception {
+ final AtomicInteger val = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(10);
+ class C implements Runnable {
+ boolean failed;
+ @Override
+ public void run() {
+ try {
+ int now = val.incrementAndGet();
+ if (now > 1) {
+ failed = true;
+ fail (now + " threads simultaneously in run()");
+ }
+ try {
+ //Intentionally sleep *longer* than the interval
+ //between executions. We *want* to pile up all of the
+ //RP threads entering run() - synchronization should
+ //serialize them. This test is to prove that this
+ //method will never be called concurrently from two threads
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+
+ }
+ } finally {
+ val.decrementAndGet();
+ latch.countDown();
+ }
+ }
+ }
+ C c = new C();
+ long initialDelay = 2000;
+ long period = 10;
+ RequestProcessor rp = new RequestProcessor("testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution", 10, true);
+ rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
+ latch.await();
+ assertFalse(c.failed);
+ rp.stop();
+ }
+
+ @RandomlyFails
+ public void testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(10);
+ final List intervals = new CopyOnWriteArrayList();
+ class C implements Runnable {
+ long start = Long.MIN_VALUE;
+
+ @Override
+ public void run() {
+ long end = System.currentTimeMillis();
+ if (start != Long.MIN_VALUE) {
+ intervals.add(end - start);
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+
+ }
+ start = System.currentTimeMillis();
+ latch.countDown();
+ }
+ }
+ C c = new C();
+ long initialDelay = 100;
+ long period = 100;
+ RequestProcessor rp = new RequestProcessor("testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed", 10, true);
+ ScheduledFuture> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
+ latch.await();
+ f.cancel(true);
+ rp.stop();
+ int max = intervals.size();
+ for (int i= 0; i < max; i++) {
+ long iv = intervals.get(i);
+ assertFalse ("Interval " + i + " should have been at least less than requested interval * 1.5 with fixed rate" + iv, iv > 150);
+ }
+ }
+
+ public void testCancelFutureInterruptsThreadEvenIfRequestProcessorForbidsIt() throws Exception {
+ RequestProcessor rp = new RequestProcessor ("X", 3, false, true);
+ final CountDownLatch releaseForRun = new CountDownLatch(1);
+ final CountDownLatch enterLatch = new CountDownLatch(1);
+ final CountDownLatch exitLatch = new CountDownLatch(1);
+ class R implements Runnable {
+ volatile boolean interrupted;
+ @Override
+ public void run() {
+ enterLatch.countDown();
+ try {
+ releaseForRun.await();
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ interrupted |= Thread.interrupted();
+ exitLatch.countDown();
+ }
+ }
+ R r = new R();
+ Future> f = rp.submit(r);
+ enterLatch.await();
+ f.cancel(true);
+ assertTrue (f.isCancelled());
+ exitLatch.await();
+ assertTrue (r.interrupted);
+ }
+
+ public void testCancelDoesNotInterruptIfNotPassedToFutureDotCancel() throws Exception {
+ RequestProcessor rp = new RequestProcessor ("X", 3, false, true);
+ final CountDownLatch releaseForRun = new CountDownLatch(1);
+ final CountDownLatch enterLatch = new CountDownLatch(1);
+ final CountDownLatch exitLatch = new CountDownLatch(1);
+ class R implements Runnable {
+ volatile boolean interrupted;
+ @Override
+ public void run() {
+ enterLatch.countDown();
+ try {
+ releaseForRun.await();
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ interrupted |= Thread.interrupted();
+ exitLatch.countDown();
+ }
+ }
+ R r = new R();
+ Future> f = rp.submit(r);
+ enterLatch.await();
+ f.cancel(false);
+ assertTrue (f.isCancelled());
+ assertFalse (r.interrupted);
+ }
+
+ public void testCancelDoesInterruptIfRequestProcessorSpecifiesItEvenIfFalsePassedToFutureDotCancel() throws Exception {
+ RequestProcessor rp = new RequestProcessor ("X", 3, true, true);
+ final CountDownLatch releaseForRun = new CountDownLatch(1);
+ final CountDownLatch enterLatch = new CountDownLatch(1);
+ final CountDownLatch exitLatch = new CountDownLatch(1);
+ class R implements Runnable {
+ volatile boolean interrupted;
+ @Override
+ public void run() {
+ enterLatch.countDown();
+ try {
+ releaseForRun.await();
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ interrupted |= Thread.interrupted();
+ exitLatch.countDown();
+ }
+ }
+ R r = new R();
+ Future> f = rp.submit(r);
+ enterLatch.await();
+ f.cancel(false);
+ assertTrue (f.isCancelled());
+ exitLatch.await();
+ assertTrue (r.interrupted);
+ }
+
+ public void testReschedulable() throws Exception {
+ final int count = 25;
+ final CountDownLatch latch = new CountDownLatch(count);
+ final RequestProcessor rp = new RequestProcessor("testReschedulable", 5, true);
+ final AtomicBoolean reschedulerLock = new AtomicBoolean(false);
+ final AtomicBoolean runExit = new AtomicBoolean(false);
+ Thread rescheduler = null;
+ try {
+ class R implements Runnable {
+ volatile ReschedulableFuture> f;
+ volatile int runCount;
+ @Override
+ public void run() {
+ runCount++;
+ latch.countDown();
+ if (latch.getCount() > 0) {
+ reschedulerLock.set(true);
+ }
+ runExit.set(true);
+ }
+ }
+ final R r = new R();
+ r.f = rp.createReschedulable(r);
+ assertFalse(r.f.isCancelled());
+ assertFalse(r.f.isDone());
+
+ class Rescheduler implements Runnable {
+ @Override
+ public void run() {
+ //Intentionally use a busywait, not a lock - more overhead,
+ //but more robust
+ for(;;) {
+ try {
+ Thread.sleep(200);
+ boolean go = reschedulerLock.get();
+ if (go) {
+ reschedulerLock.set(false);
+ //ensure the previous run has really exited
+ do {
+ Thread.sleep(10);
+ } while (!runExit.get());
+ Thread.sleep(20);
+ runExit.set(false);
+ r.f.schedule(10);
+ }
+ } catch (InterruptedException ex) {
+ return;
+ }
+ }
+ }
+ }
+ rescheduler = new Thread(new Rescheduler(), "testReschedulable Rescheduler thread");
+ rescheduler.start();
+ reschedulerLock.set(true);
+ r.f.schedule(10);
+ latch.await();
+ assertTrue (latch.getCount() == 0);
+ assertTrue (r.runCount == count);
+ assertFalse(r.f.isCancelled());
+ assertTrue(r.f.isDone());
+ assertTrue(runExit.get());
+ } finally {
+ rp.stop();
+ if (rescheduler != null) {
+ rescheduler.interrupt();
+ }
+ }
+ }
+
+ public void testReschedulableGet() throws Exception {
+ //Yes, the busywaits are ugly. However, they are locking-bug-proof -
+ //we want to be testing our code that deals with complex inter-thread
+ //locking - add more locking complexity and what are we testing?
+ final int count = 25;
+ final CountDownLatch latch = new CountDownLatch(count);
+ final RequestProcessor rp = new RequestProcessor("testReschedulable", 1, true);
+ final AtomicBoolean reschedulerLock = new AtomicBoolean(false);
+ final AtomicBoolean runExit = new AtomicBoolean(false);
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean suspend = new AtomicBoolean(false);
+ final AtomicBoolean waitForSuspend = new AtomicBoolean(false);
+ Thread rescheduler = null;
+ try {
+ class R implements Runnable {
+ volatile ReschedulableFuture> f;
+ volatile int runCount;
+ @Override
+ public void run() {
+ runCount++;
+ if (runCount == 20) {
+ waitForSuspend.set(true);
+ while (suspend.get()) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {
+
+ }
+ }
+ }
+ latch.countDown();
+ if (latch.getCount() > 0) {
+ reschedulerLock.set(true);
+ running.set(true);
+ } else {
+ running.set(false);
+ }
+ runExit.set(true);
+ }
+ }
+ final R r = new R();
+ r.f = rp.createReschedulable(r);
+ assertFalse(r.f.isCancelled());
+ assertFalse(r.f.isDone());
+
+ class Rescheduler implements Runnable {
+ @Override
+ public void run() {
+ //Intentionally use a busywait, not a lock - more overhead,
+ //but more robust
+ for(;;) {
+ try {
+ Thread.sleep(200);
+ boolean go = reschedulerLock.get();
+ if (go) {
+ reschedulerLock.set(false);
+ //ensure the previous run has really exited
+ do {
+ Thread.sleep(10);
+ } while (!runExit.get());
+ Thread.sleep(20);
+ runExit.set(false);
+ r.f.schedule(10);
+ }
+ } catch (InterruptedException ex) {
+ return;
+ }
+ }
+ }
+ }
+ rescheduler = new Thread(new Rescheduler(), "testReschedulableGet Rescheduler thread");
+ rescheduler.start();
+ reschedulerLock.set(true);
+ suspend.set(true);
+ r.f.schedule(10);
+ r.f.get();
+ while (!waitForSuspend.get());
+ assertTrue(running.get());
+ assertTrue ("get() should have blocked for at least one run", latch.getCount() < count);
+ assertTrue (r.runCount > 0);
+ assertTrue (r.runCount < count);
+ assertFalse(r.f.isCancelled());
+ assertFalse(r.f.isDone());
+ suspend.set(false);
+
+ } finally {
+ rp.stop();
+ if (rescheduler != null) {
+ rescheduler.interrupt();
+ }
+ }
+ }
+
+ public void testReschedulableCancel() throws Exception {
+ //Yes, the busywaits are ugly. However, they are locking-bug-proof -
+ //we want to be testing our code that deals with complex inter-thread
+ //locking - add more locking complexity and what are we testing?
+ final int count = 25;
+ final CountDownLatch latch = new CountDownLatch(count);
+ final RequestProcessor rp = new RequestProcessor("testReschedulable", 1, true);
+ final AtomicBoolean reschedulerLock = new AtomicBoolean(false);
+ final AtomicBoolean runExit = new AtomicBoolean(false);
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean suspend = new AtomicBoolean(false);
+ final AtomicBoolean waitForSuspend = new AtomicBoolean(false);
+ final Object lock = new Object();
+ Thread rescheduler = null;
+ try {
+ class R implements Runnable {
+ volatile ReschedulableFuture> f;
+ volatile int runCount;
+ volatile boolean interrupted;
+ @Override
+ public void run() {
+ runCount++;
+ if (runCount == 20) {
+ waitForSuspend.set(true);
+ while (suspend.get()) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ running.set(false);
+ }
+ }
+ }
+ latch.countDown();
+ if (latch.getCount() > 0) {
+ reschedulerLock.set(true);
+ running.set(true);
+ } else {
+ running.set(false);
+ }
+ runExit.set(true);
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+ final R r = new R();
+ r.f = rp.createReschedulable(r);
+ assertFalse(r.f.isCancelled());
+ assertFalse(r.f.isDone());
+
+ class Rescheduler implements Runnable {
+ @Override
+ public void run() {
+ //Intentionally use a busywait, not a lock - more overhead,
+ //but more robust
+ for(;;) {
+ try {
+ Thread.sleep(200);
+ boolean go = reschedulerLock.get();
+ if (go) {
+ reschedulerLock.set(false);
+ //ensure the previous run has really exited
+ do {
+ Thread.sleep(10);
+ } while (!runExit.get());
+ Thread.sleep(20);
+ runExit.set(false);
+ r.f.schedule(10);
+ }
+ } catch (InterruptedException ex) {
+ return;
+ }
+ }
+ }
+ }
+ rescheduler = new Thread(new Rescheduler(), "testReschedulableGet Rescheduler thread");
+ rescheduler.start();
+ reschedulerLock.set(true);
+ suspend.set(true);
+ r.f.schedule(10);
+ while (!waitForSuspend.get());
+ r.f.cancel(true);
+ while (running.get());
+ assertTrue (r.interrupted);
+
+ assertFalse(running.get());
+ assertTrue (r.runCount > 0);
+ assertTrue (r.runCount < count);
+ assertTrue(r.f.isCancelled());
+ assertTrue(r.f.isDone());
+ suspend.set(false);
+ rescheduler.stop();
+ runExit.set(false);
+ r.f.schedule(100);
+ assertFalse (r.f.isCancelled());
+ assertFalse (r.f.isDone());
+ synchronized (lock) {
+ lock.wait();
+ }
+ Thread.sleep (1000);
+ assertFalse (r.f.isCancelled());
+ assertTrue (r.f.isDone());
+ } finally {
+ rp.stop();
+ if (rescheduler != null && rescheduler.isAlive()) {
+ rescheduler.interrupt();
+ }
+ }
+ }
+
+ static void wait (AtomicBoolean val) {
+ while (!val.get()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ return;
+ }
+ }
+ val.set(false);
+ }
+
+ public void testReschedulableCancelAndRestart() throws Exception {
+ final AtomicBoolean runReleased = new AtomicBoolean(false);
+ final AtomicBoolean inRun = new AtomicBoolean(false);
+ class R implements Runnable {
+ @Override
+ public void run() {
+ inRun.set(true);
+ try {
+ wait (runReleased);
+ } finally {
+ inRun.set(false);
+ }
+ }
+ }
+ R r = new R();
+ RequestProcessor rp = new RequestProcessor("testReschedulableCancelSimple", 5, true);
+ ReschedulableFuture f = rp.createReschedulable(r, true);
+ assertTrue (f.isDone());
+ assertFalse (f.isCancelled());
+ f.schedule (100);
+ wait (inRun);
+ assertFalse(f.isDone());
+ assertFalse(f.isCancelled());
+ runReleased.set(true);
+ f.get();
+ assertFalse(f.isCancelled());
+ assertTrue (f.isDone());
+
+ f.schedule(100);
+ assertFalse(f.isDone());
+ assertFalse(f.isCancelled());
+ f.cancel(true);
+ assertTrue (f.isCancelled());
+ assertTrue (f.isDone());
+ f.schedule(100);
+ assertFalse (f.isCancelled());
+ wait(inRun);
+ assertFalse(f.isDone());
+ assertFalse(f.isCancelled());
+ rp.stop();
+ }
+}