diff --git a/openide.util/manifest.mf b/openide.util/manifest.mf --- a/openide.util/manifest.mf +++ b/openide.util/manifest.mf @@ -1,5 +1,5 @@ Manifest-Version: 1.0 OpenIDE-Module: org.openide.util OpenIDE-Module-Localizing-Bundle: org/openide/util/Bundle.properties -OpenIDE-Module-Specification-Version: 8.1 +OpenIDE-Module-Specification-Version: 8.2 diff --git a/openide.util/src/org/openide/util/RequestProcessor.java b/openide.util/src/org/openide/util/RequestProcessor.java --- a/openide.util/src/org/openide/util/RequestProcessor.java +++ b/openide.util/src/org/openide/util/RequestProcessor.java @@ -41,15 +41,33 @@ package org.openide.util; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; +import java.util.Set; import java.util.Stack; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -154,10 +172,12 @@ * } * } * - * - * @author Petr Nejedly, Jaroslav Tulach + *

+ * Since org.openide.util, implements + * {@link java.util.concurrent.ScheduledExecutorService} + * @author Petr Nejedly, Jaroslav Tulach, Tim Boudreau */ -public final class RequestProcessor implements Executor { +public final class RequestProcessor implements ScheduledExecutorService { /** the static instance for users that do not want to have own processor */ private static RequestProcessor DEFAULT = new RequestProcessor(); @@ -186,7 +206,7 @@ /** If the RP was stopped, this variable will be set, every new post() * will throw an exception and no task will be processed any further */ - boolean stopped = false; + volatile boolean stopped = false; /** The lock covering following four fields. They should be accessed * only while having this lock held. */ @@ -585,6 +605,765 @@ } } + /** + * {@inheritDoc} + * @throws an IllegalStateException if called on the + * {@linkplain #getDefault default request processor} + * @since org.openide.util 8.2 + */ + @Override + public void shutdown() { + if (this == UNLIMITED) { + throw new IllegalStateException ("Cannot shut down the default " + //NOI18N + "request processor"); //NOI18N + } + stop(); + } + + /** + * {@inheritDoc} + * @throws an IllegalStateException if called on the + * {@linkplain #getDefault default request processor} + * @since org.openide.util 8.2 + */ + @Override + public List shutdownNow() { + if (this == UNLIMITED) { + throw new IllegalStateException ("Cannot shut down the default " + //NOI18N + "request processor"); //NOI18N + } + //XXX more aggressive shutdown? + stop(); + synchronized (processorLock) { + List result = new ArrayList(processors.size()); + for (Processor p : processors) { + if (p != null && p.todo != null && p.todo.run != null) { + Runnable r = p.todo.run; + if (r instanceof RunnableWrapper) { + Runnable other = ((RunnableWrapper) r).getRunnable(); + r = other == null ? r : other; + } + result.add(r); + } + } + return result; + } + } + + /** + * {@inheritDoc} + * @since org.openide.util 8.2 + */ + @Override + public boolean isShutdown() { + return stopped; + } + + /** + * {@inheritDoc} + * @since org.openide.util 8.2 + */ + @Override + public boolean isTerminated() { + boolean result = true; + Set set = collectProcessors(new HashSet()); + for (Processor p : set) { + if (p.isAlive() && p.belongsTo(this)) { + result = false; + break; + } + } + return result; + } + + /** + * {@inheritDoc} + * @since org.openide.util 8.2 + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + Parameters.notNull("unit", unit); //NOI18N + long timeoutMillis = unit.convert(timeout, TimeUnit.MILLISECONDS); + boolean result = stopped; + long doneTime = System.currentTimeMillis() + timeoutMillis; + Set procs = new HashSet(); +outer: do { + procs = collectProcessors(procs); + if (procs.isEmpty()) { + return true; + } + for (Processor p : procs) { + long remaining = doneTime - System.currentTimeMillis(); + if (remaining <= 0) { + result = collectProcessors(procs).isEmpty(); + break outer; + } + if (p.belongsTo(this)) { + p.join(remaining); + } + result = !p.isAlive() || !p.belongsTo(this); + } + procs.clear(); + } while (!procs.isEmpty()); + return result; + } + + private Set collectProcessors (Set procs) { + procs.clear(); + synchronized (processorLock) { + for (Processor p : processors) { + if (p.belongsTo(this)) { + procs.add(p); + } + } + } + return procs; + } + + /** + * {@inheritDoc} + *

+ * Note: If the passed {@link java.util.concurrent.Callable} implements + * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} + * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. + * If Cancellable.cancel() returns false, then the job will not be + * cancelled. + * @since org.openide.util 8.2 + */ + @Override + public Future submit(Callable task) { + Parameters.notNull("task", task); //NOI18N + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + RPFutureTask result = new RPFutureTask(task); + Task t = create(result); + result.setTask(t); + t.schedule(0); + return result; + } + /** + * {@inheritDoc} + * Note: If the passed {@link java.lang.Runnable} implements + * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} + * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. + * If Cancellable.cancel() returns false, then the job will not be + * cancelled. + * @since org.openide.util 8.2 + */ + @Override + public Future submit(Runnable task, T predefinedResult) { + Parameters.notNull("task", task); //NOI18N + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + RPFutureTask result = new RPFutureTask(task, predefinedResult); + Task t = create(result); + result.setTask(t); + t.schedule(0); + return result; + } + + /** + * {@inheritDoc} + *

+ * Note: If the passed {@link java.lang.Runnable} implements + * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} + * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. + * If Cancellable.cancel() returns false, then the job will not be + * cancelled. + * @since org.openide.util 8.2 + */ + @Override + public Future submit(Runnable task) { + return this.submit (task, null); + } + + /** + * {@inheritDoc} + * @since org.openide.util 8.2 + */ + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + Parameters.notNull("tasks", tasks); //NOI18N + List> result = new ArrayList>(tasks.size()); + CountDownLatch wait = new CountDownLatch(tasks.size()); + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + //NOI18N + tasks); + } + Callable delegate = new WaitableCallable(c, wait); + result.add (submit(delegate)); + } + wait.await(); + return result; + } + + /** + * {@inheritDoc} + *

+ * Executes the given tasks, returning a list of Futures holding their + * status and results when all complete or the timeout expires, whichever + * happens first. + * @since org.openide.util 8.2 + */ + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("tasks", tasks); //NOI18N + CountDownLatch wait = new CountDownLatch(tasks.size()); + List> result = new ArrayList>(tasks.size()); + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + tasks); //NOI18N + } + Callable delegate = new WaitableCallable(c, wait); + result.add (submit(delegate)); + } + if (!wait.await(timeout, unit)) { + for (Future f : result) { + RPFutureTask ft = (RPFutureTask) f; + ft.cancel(true); + } + } + return result; + } + /** + * {@inheritDoc} + *

+ * Executes the given tasks, returning the result of one which has + * completed and cancelling any incomplete tasks. + * @since org.openide.util 8.2 + */ + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + Parameters.notNull("tasks", tasks); //NOI18N + CountDownLatch wait = new CountDownLatch(1); + List> result = new ArrayList>(tasks.size()); + AtomicReference ref = new AtomicReference(); + try { + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + //NOI18N + tasks); + } + Callable delegate = new WaitableCallable(c, ref, wait); + result.add (submit(delegate)); + } + wait.await(); + } finally { + for (Future f : result) { + RPFutureTask ft = (RPFutureTask) f; + ft.cancel(true); + } + } + return ref.get(); + } + /** + * {@inheritDoc} + *

+ * Executes the given tasks, returning a list of Futures holding their + * status and results when all complete or the timeout expires, whichever + * happens first. + * @param The result type + * @param tasks A collection of callables + * @param timeout The maximum time to wait for completion, in the specified time units + * @param unit The time unit + * @return A list of futures + * @throws InterruptedException if the timeout expires or execution is interrupted + * @since org.openide.util 8.2 + */ + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("tasks", tasks); //NOI18N + CountDownLatch wait = new CountDownLatch(1); + List> result = new ArrayList>(tasks.size()); + AtomicReference ref = new AtomicReference(); + try { + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + //NOI18N + tasks); + } + Callable delegate = new WaitableCallable(c, ref, wait); + result.add (submit(delegate)); + } + wait.await(timeout, unit); + } finally { + for (Future f : result) { + RPFutureTask ft = (RPFutureTask) f; + ft.cancel(true); + } + } + return ref.get(); + } + + /** + * {@inheritDoc} + * @since org.openide.util 8.2 + */ + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + Parameters.notNull("command", command); //NOI18N + Parameters.notNull("unit", unit); //NOI18N + if (delay < 0) { + throw new IllegalArgumentException ("Negative delay: " + delay); + } + if (stopped) { + throw new RejectedExecutionException("Request Processor already stopped"); //NOI18N + } + long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS); + ScheduledRPFutureTask result = new ScheduledRPFutureTask(command, null, delayMillis); + Task t = create(result); + result.setTask(t); + t.schedule(delayMillis); + return result; + } + /** + * {@inheritDoc} + * @since org.openide.util 8.2 + */ + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("callable", callable); //NOI18N + if (delay < 0) { + throw new IllegalArgumentException ("Negative delay: " + delay); + } + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS); + ScheduledRPFutureTask result = new ScheduledRPFutureTask(callable, delayMillis); + Task t = create(result); + result.setTask(t); + t.schedule(delayMillis); + return result; + } + + /** + * {@inheritDoc} + *

+ * Schedules a runnable which will run with a given frequency, regardless + * of how long execution takes, with the exception that if execution takes + * longer than the specified delay, execution will be delayed but will + * never be run on two threads concurrently. + * @since org.openide.util 8.2 + */ + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return scheduleFixed(command, initialDelay, period, unit, false); + } + + /** + * {@inheritDoc} + *

+ * Schedules a runnable which will run repeatedly after the specified initial + * delay, with the specified delay between the completion of one run and + * the start of the next. + * @since org.openide.util 8.2 + */ + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduleFixed(command, initialDelay, delay, unit, true); + } + + /** + * Create a Future which can be rescheduled and run multiple times. Such + * repeatable tasks are useful, for example, for cases where a GUI component + * should perform some background initialization when first shown, but + * should cancel that initialization if hidden; or cases where expensive + * background processing should be postponed on user input, but run after + * a certain amount of idle time. + *

+ * Note that this method creates a {@link ReschedulableFuture} — + * it does not enqueue it to run. To enqueue it to run, call + * the returned object's {@link ReschedulableFuture.schedule(long)} method. + *

+ * The returned Future may be cancelled, and then rescheduled to run + * later; if it's {@link ReschedulableFuture.schedule(long)} method is + * after it has been scheduled, but before it has run, it will run after + * the new timeout. + *

+ * The returned {@link ReschedulableFuture} will return false + * from isDone() until it has run for the first time. + *

+ * Any call to one of the schedule() methods will reset the + * cancelled state to false. + * + * @param runnable A runnable + * @return A future which can be scheduled, cancelled and rescheduled as + * desired. + */ + public ReschedulableFuture createReschedulable(Runnable runnable) { + return createReschedulable(runnable, false); + } + + /** + * Create a Future which can be rescheduled and run multiple times. Such + * repeatable tasks are useful, for example, for cases where a GUI component + * should perform some background initialization when first shown, but + * should cancel that initialization if hidden; or cases where expensive + * background processing should be postponed on user input, but run after + * a certain amount of idle time. + *

+ * Note that this method creates a {@link ReschedulableFuture} — + * it does not enqueue it to run. To enqueue it to run, call + * the returned object's {@link ReschedulableFuture.schedule(long)} method. + *

+ * The returned Future may be cancelled, and then rescheduled to run + * later; if it's {@link ReschedulableFuture.schedule(long)} method is + * after it has been scheduled, but before it has run, it will run after + * the new timeout. + *

+ * The returned {@link ReschedulableFuture} will return the value of the + * passed argument initiallyFinished + * from isDone() until it has run for the first time. + *

+ * Any call to one of the schedule() methods will reset the + * cancelled state to false. + *

+ * Calling get() on the returned Future will + * block until the next time the task completes, or until it is cancelled. + * + * @param runnable A runnable + * @param initiallyFinished If true, isDone() will initially + * return true. + * @return A future which can be scheduled, cancelled and rescheduled as + * desired. + */ + public ReschedulableFuture createReschedulable(Runnable runnable, boolean initiallyFinished) { + Task task = create(runnable, initiallyFinished); + ReschedulableFutureImpl impl = new ReschedulableFutureImpl(runnable, task); + return impl; + } + + private ScheduledFuture scheduleFixed (Runnable command, long initialDelay, long period, TimeUnit unit, boolean fixedDelay) { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("command", command); //NOI18N + if (period < 0) { + throw new IllegalArgumentException ("Negative delay: " + period); //NOI18N + } + if (initialDelay < 0) { + throw new IllegalArgumentException ("Negative initialDelay: " //NOI18N + + initialDelay); + } + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + long initialDelayMillis = unit.convert(initialDelay, TimeUnit.MILLISECONDS); + long periodMillis = unit.convert(period, TimeUnit.MILLISECONDS); + + TaskFutureWrapper wrap = fixedDelay ? + new FixedDelayTask(command, initialDelayMillis, periodMillis) : + new FixedRateTask(command, initialDelay, periodMillis); + Task t = create(wrap); + wrap.t = t; + t.cancelled = wrap.cancelled; + t.schedule (initialDelayMillis); + + return wrap; + } + + private static abstract class TaskFutureWrapper implements ScheduledFuture, Runnable, RunnableWrapper { + volatile Task t; + protected final Runnable toRun; + protected final long initialDelay; + protected final long period; + final AtomicBoolean cancelled = new AtomicBoolean(); + TaskFutureWrapper(Runnable run, long initialDelay, long period) { + this.toRun = run; + this.initialDelay = initialDelay; + this.period = period; + } + + @Override + public final Runnable getRunnable() { + return toRun; + } + + @Override + public int compareTo(Delayed o) { + long other = o.getDelay(TimeUnit.MILLISECONDS); + long ours = getDelay(TimeUnit.MILLISECONDS); + //Might overflow on, say, ms compared to Long.MAX_VALUE, TimeUnit.DAYS + return (int) (ours - other); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = true; + if (toRun instanceof Cancellable) { + result = ((Cancellable) toRun).cancel(); + } + if (result) { + //will invoke cancelled.set(true) + result = t.cancel(mayInterruptIfRunning); + } + return result; + } + + @Override + public boolean isCancelled() { + return cancelled.get(); + } + + @Override + public boolean isDone() { + return cancelled.get() || t.isFinished(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + if (cancelled.get()) { + throw new CancellationException(); + } + t.waitFinished(); + if (cancelled.get()) { + throw new CancellationException(); + } + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (cancelled.get()) { + throw new CancellationException(); + } + long millis = unit.convert(timeout, TimeUnit.MILLISECONDS); + t.waitFinished(millis); + if (cancelled.get()) { + throw new CancellationException(); + } + return null; + } + } + + private static final class FixedRateTask extends TaskFutureWrapper { + private final Object runLock = new Object(); + private final Object timeLock = new Object(); + //must be accessed holding timeLock + private int runCount; + private long nextRunTime; + private long start = Long.MIN_VALUE; + volatile boolean firstRun = true; + FixedRateTask (Runnable run, long initialDelay, long period) { + super (run, initialDelay, period); + } + + @Override + public void run() { + if (firstRun) { + synchronized (timeLock) { + start = System.currentTimeMillis(); + firstRun = false; + } + } + try { + synchronized(runLock) { + toRun.run(); + } + } catch (RuntimeException e) { + cancel(true); + throw e; + } + reschedule(); + } + + private void reschedule() { + //All access to nextRunTime & runCount under lock. + long interval; + synchronized (timeLock) { + nextRunTime = start + (initialDelay + period * runCount); + runCount++; + interval = Math.max(0, nextRunTime - System.currentTimeMillis()); + } + boolean canContinue = !cancelled.get() && !Thread.currentThread().isInterrupted(); + if (canContinue) { + t.schedule(interval); + } + } + + @Override + public long getDelay(TimeUnit unit) { + if (isCancelled()) { + return Long.MAX_VALUE; + } + long delay; + synchronized (timeLock) { + delay = Math.min(0, nextRunTime - System.currentTimeMillis()); + } + return unit.convert(delay, TimeUnit.MILLISECONDS); + } + } + + private static final class FixedDelayTask extends TaskFutureWrapper { + private volatile boolean firstRun = true; + private final AtomicLong nextRunTime = new AtomicLong(); + FixedDelayTask(Runnable run, long initialDelay, long period) { + super (run, initialDelay, period); + } + + @Override + public long getDelay(TimeUnit unit) { + long next = nextRunTime.get(); + return unit.convert (next - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + if (!fini()) { + toRun.run(); + } + if (!fini()) { + reschedule(); + } + } + + private boolean fini() { + boolean result = cancelled.get() || Thread.currentThread().isInterrupted(); + return result; + } + + private void reschedule() { + long delay; + if (firstRun) { + delay = initialDelay; + } else { + delay = period; + } + nextRunTime.set(System.currentTimeMillis() + delay); + firstRun = false; + if (!fini()) { + t.schedule((int) delay); + } + } + } + + private interface RunnableWrapper { + Runnable getRunnable(); + } + + private static final class WaitableCallable implements Callable, Cancellable { + private final CountDownLatch countdown; + private final Callable delegate; + private final AtomicReference ref; + private volatile boolean failed; + WaitableCallable(Callable delegate, CountDownLatch countdown) { + this (delegate, null, countdown); + } + + WaitableCallable(Callable delegate, AtomicReference ref, CountDownLatch countdown) { + this.delegate = delegate; + this.countdown = countdown; + this.ref = ref; + } + + boolean failed() { + return failed; + } + + @Override + public T call() throws Exception { + try { + T result = delegate.call(); + if (ref != null) { + ref.set(result); + } + return result; + } catch (RuntimeException e) { + failed = true; + throw e; + } catch (Error e) { + failed = true; + throw e; + } finally { + if (!failed || ref == null) { + countdown.countDown(); + } + } + } + + @Override + public boolean cancel() { + return delegate instanceof Cancellable ? ((Cancellable) delegate).cancel() : true; + } + } + + private static class RPFutureTask extends FutureTask implements RunnableWrapper { + protected volatile Task task; + private final Runnable runnable; + private final Cancellable cancellable; + RPFutureTask(Callable c) { + super (c); + this.runnable = null; + this.cancellable = c instanceof Cancellable ? (Cancellable) c : null; + } + + RPFutureTask(Runnable r, T result) { + super (r, result); + this.runnable = r; + this.cancellable = r instanceof Cancellable ? (Cancellable) r : null; + } + + void setTask(Task task) { + this.task = task; + } + + RPFutureTask(Callable c, T predefinedResult) { + this (c); + set(predefinedResult); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = cancellable == null ? true : cancellable.cancel(); + if (result) { + boolean taskCancelled = task.cancel(); + boolean superCancel = super.cancel(mayInterruptIfRunning); //must call both! + result = taskCancelled && superCancel; + } + return result; + } + + @Override + public Runnable getRunnable() { + return this.runnable; + } + } + + private static final class ScheduledRPFutureTask extends RPFutureTask implements ScheduledFuture { + protected final long delayMillis; + ScheduledRPFutureTask(Callable c, long delayMillis) { + super (c); + this.delayMillis = delayMillis; + } + + ScheduledRPFutureTask(Runnable r, T result, long delayMillis) { + super (r, result); + this.delayMillis = delayMillis; + } + + + @Override + public long getDelay(TimeUnit unit) { + return TimeUnit.MILLISECONDS.convert(delayMillis, unit); + } + + @Override + public int compareTo(Delayed o) { + //Can overflow, if one delay is, say, days, and the other, microseconds + long otherDelayMillis = o.getDelay(TimeUnit.MILLISECONDS); + return (int) (delayMillis - otherDelayMillis); + } + } + private class EnqueueTask extends TimerTask { Item itm; @@ -610,6 +1389,7 @@ private int priority = Thread.MIN_PRIORITY; private long time = 0; private Thread lastThread = null; + private AtomicBoolean cancelled; /** @param run runnable to start * @param delay amount of millis to wait @@ -676,6 +1456,10 @@ * @param delay time in milliseconds to wait (starting from now) */ public void schedule(int delay) { + schedule((long) delay); + } + + void schedule(long delay) { if (stopped) { throw new IllegalStateException("RequestProcessor already stopped!"); // NOI18N } @@ -685,6 +1469,9 @@ final Item localItem; synchronized (processorLock) { + if (cancelled != null) { + cancelled.set(false); + } notifyRunning(); if (item != null) { @@ -747,6 +1534,49 @@ } } + /** + * Implementation of cancel for use with Future objects, to guarantee + * that the thread will be interrupted, no matter what the setting + * on the owning RP. + * @param interrupt If true, the thread should be interrupted + * @return true if cancellation occurred + */ + boolean cancel (boolean interrupt) { + synchronized (processorLock) { + if (cancelled != null) { + boolean wasCancelled = !cancelled.getAndSet(true); + if (wasCancelled) { + return false; + } + } + boolean success; + + if (item == null) { + success = false; + } else { + Processor p = item.getProcessor(); + success = item.clear(null); + + if (p != null) { + if (interrupt) { + success = p.interrupt(this, RequestProcessor.this); + } else { + //despite its name, will not actually interrupt + //unless the RP specifies that it should + p.interruptTask(this, RequestProcessor.this); + } + if (success) { + item = null; + } + } + } + if (success) { + notifyFinished(); // mark it as finished + } + return success; + } + } + /** Current priority of the task. * @return the priority level (see e.g. {@link Thread#NORM_PRIORITY} */ @@ -1062,6 +1892,12 @@ } } + boolean belongsTo(RequestProcessor r) { + synchronized (lock) { + return source == r; + } + } + /** * The method that will repeatedly wait for a request and perform it. */ @@ -1190,6 +2026,14 @@ } } + boolean interrupt (Task t, RequestProcessor src) { + if (t != todo) { + return false; + } + interrupt(); + return true; + } + /** @see "#20467" */ private static void doNotify(RequestProcessor.Task todo, Throwable ex) { if (SLOW && todo.item.message == null) { diff --git a/openide.util/src/org/openide/util/ReschedulableFuture.java b/openide.util/src/org/openide/util/ReschedulableFuture.java new file mode 100644 --- /dev/null +++ b/openide.util/src/org/openide/util/ReschedulableFuture.java @@ -0,0 +1,75 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2010 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common + * Development and Distribution License("CDDL") (collectively, the + * "License"). You may not use this file except in compliance with the + * License. You can obtain a copy of the License at + * http://www.netbeans.org/cddl-gplv2.html + * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the + * specific language governing permissions and limitations under the + * License. When distributing the software, include this License Header + * Notice in each file and include the License file at + * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this + * particular file as subject to the "Classpath" exception as provided + * by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the + * License Header, with the fields enclosed by brackets [] replaced by + * your own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * If you wish your version of this file to be governed by only the CDDL + * or only the GPL Version 2, indicate your decision by adding + * "[Contributor] elects to include this software in this distribution + * under the [CDDL or GPL Version 2] license." If you do not indicate a + * single choice of license, a recipient has the option to distribute + * your version of this file under either the CDDL, the GPL Version 2 or + * to extend the choice of license to its licensees as provided above. + * However, if you add GPL Version 2 code and therefore, elected the GPL + * Version 2 license, then the option applies only if the new code is + * made subject to such option by the copyright holder. + * + * Contributor(s): + * + * Portions Copyrighted 2010 Sun Microsystems, Inc. + */ + +package org.openide.util; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A Future which can be run more than once. + * + * @author Tim Boudreau + */ +public interface ReschedulableFuture extends ScheduledFuture{ + /** + * Schedule this future to be run. If this task is already enqueued to run + * after some delay, the task will now run after the passed delay, regardless + * of whether it is sooner or later than the original delay. If this + * task has been cancelled, it is re-enqueued. + * + * @param delay The delay + * @param unit The time unit of the delay + * @return A ScheduledFuture (which may not == this) which can be + * monitored + */ + ScheduledFuture schedule (long delay, TimeUnit unit); + /** + * Schedule this future to be run. If this task is already enqueued to run + * after some delay, the task will now run after the passed delay, regardless + * of whether it is sooner or later than the original delay. If this + * task has been cancelled, it is re-enqueued. + * + * @param delayMillis The delay in milliseconds + * @param unit The time unit of the delay + * @return A ScheduledFuture (which may not == this) which can be + * monitored + */ + ScheduledFuture schedule (long delayMillis); +} diff --git a/openide.util/src/org/openide/util/ReschedulableFutureImpl.java b/openide.util/src/org/openide/util/ReschedulableFutureImpl.java new file mode 100644 --- /dev/null +++ b/openide.util/src/org/openide/util/ReschedulableFutureImpl.java @@ -0,0 +1,131 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2010 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common + * Development and Distribution License("CDDL") (collectively, the + * "License"). You may not use this file except in compliance with the + * License. You can obtain a copy of the License at + * http://www.netbeans.org/cddl-gplv2.html + * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the + * specific language governing permissions and limitations under the + * License. When distributing the software, include this License Header + * Notice in each file and include the License file at + * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this + * particular file as subject to the "Classpath" exception as provided + * by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the + * License Header, with the fields enclosed by brackets [] replaced by + * your own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * If you wish your version of this file to be governed by only the CDDL + * or only the GPL Version 2, indicate your decision by adding + * "[Contributor] elects to include this software in this distribution + * under the [CDDL or GPL Version 2] license." If you do not indicate a + * single choice of license, a recipient has the option to distribute + * your version of this file under either the CDDL, the GPL Version 2 or + * to extend the choice of license to its licensees as provided above. + * However, if you add GPL Version 2 code and therefore, elected the GPL + * Version 2 license, then the option applies only if the new code is + * made subject to such option by the copyright holder. + * + * Contributor(s): + * + * Portions Copyrighted 2010 Sun Microsystems, Inc. + */ + +package org.openide.util; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + * @author Tim Boudreau + */ +final class ReschedulableFutureImpl implements ReschedulableFuture { + private final RequestProcessor.Task task; + private final AtomicBoolean cancelled = new AtomicBoolean(); + private final Runnable toRun; + ReschedulableFutureImpl(Runnable run, RequestProcessor.Task task) { + this.task = task; + this.toRun = run; + } + + @Override + public ScheduledFuture schedule(long delay, TimeUnit unit) { + schedule(unit.convert(delay, TimeUnit.MILLISECONDS)); + return this; + } + + @Override + public ScheduledFuture schedule(long delayMillis) { + cancelled.set(false); + task.schedule(delayMillis); + return this; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert (task.getDelay(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + long val = getDelay(TimeUnit.MILLISECONDS); + long other = o.getDelay(TimeUnit.MILLISECONDS); + return (int)(val - other); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (cancelled.compareAndSet(false, true)) { + boolean jobCancelled = toRun instanceof Cancellable ? ((Cancellable) toRun).cancel() : true; + boolean taskCancelled = task.cancel(mayInterruptIfRunning); + return jobCancelled && taskCancelled; + } + return false; + } + + @Override + public boolean isCancelled() { + return cancelled.get(); + } + + @Override + public boolean isDone() { + return cancelled.get() || task.isFinished(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + if (cancelled.get()) { + throw new CancellationException(); + } + task.waitFinished(); + if (cancelled.get()) { + throw new CancellationException(); + } + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (cancelled.get()) { + throw new CancellationException(); + } + task.waitFinished(unit.convert(timeout, TimeUnit.MILLISECONDS)); + if (cancelled.get()) { + throw new CancellationException(); + } + return null; + } +} diff --git a/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java b/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java new file mode 100644 --- /dev/null +++ b/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java @@ -0,0 +1,1500 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2010 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common + * Development and Distribution License("CDDL") (collectively, the + * "License"). You may not use this file except in compliance with the + * License. You can obtain a copy of the License at + * http://www.netbeans.org/cddl-gplv2.html + * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the + * specific language governing permissions and limitations under the + * License. When distributing the software, include this License Header + * Notice in each file and include the License file at + * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this + * particular file as subject to the "Classpath" exception as provided + * by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the + * License Header, with the fields enclosed by brackets [] replaced by + * your own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * If you wish your version of this file to be governed by only the CDDL + * or only the GPL Version 2, indicate your decision by adding + * "[Contributor] elects to include this software in this distribution + * under the [CDDL or GPL Version 2] license." If you do not indicate a + * single choice of license, a recipient has the option to distribute + * your version of this file under either the CDDL, the GPL Version 2 or + * to extend the choice of license to its licensees as provided above. + * However, if you add GPL Version 2 code and therefore, elected the GPL + * Version 2 license, then the option applies only if the new code is + * made subject to such option by the copyright holder. + * + * Contributor(s): + * + * Portions Copyrighted 2010 Sun Microsystems, Inc. + */ +package org.openide.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import junit.framework.Test; +import org.netbeans.junit.NbTestCase; +import org.netbeans.junit.NbTestSuite; +import org.netbeans.junit.RandomlyFails; + +/** + * + * @author Tim Boudreau + */ +public class RequestProcessor180386Test extends NbTestCase { + + public RequestProcessor180386Test(java.lang.String testName) { + super(testName); + } + + public static Test suite() { + Test t = null; + if (t == null) { + t = new NbTestSuite(RequestProcessor180386Test.class); + } + return t; + } + + public void testSubmit() throws Exception { + class C implements Callable { + + volatile boolean hasRun; + + @Override + public String call() throws Exception { + String result = "Hello"; + hasRun = true; + return result; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + assertEquals("Hello", f.get()); + assertTrue(c.hasRun); + + class R implements Runnable { + + volatile boolean hasRun; + + @Override + public void run() { + hasRun = true; + } + } + R r = new R(); + f = RequestProcessor.getDefault().submit(r, "Goodbye"); + assertEquals("Goodbye", f.get()); + assertTrue(r.hasRun); + } + + public void testSomeTasksNotRunIfShutDown() throws Exception { + final Object lock = new Object(); + int count = 10; + final CountDownLatch waitAllLaunched = new CountDownLatch(count); + final CountDownLatch waitOneFinished = new CountDownLatch(1); + RequestProcessor rp = new RequestProcessor("TestRP", count * 2); + class R implements Runnable { + + volatile boolean hasStarted; + volatile boolean hasFinished; + + @Override + public void run() { + hasStarted = true; + waitAllLaunched.countDown(); + synchronized (lock) { + try { + lock.wait(); + if (Thread.interrupted()) { + return; + } + } catch (InterruptedException ex) { + return; + } finally { + waitOneFinished.countDown(); + } + hasFinished = true; + } + } + } + Set> s = new HashSet>(); + Set rs = new HashSet(); + for (int i = 0; i < count; i++) { + String currName = "Runnable " + i; + R r = new R(); + rs.add(r); + s.add(rp.submit(r, currName)); + } + waitAllLaunched.await(); + synchronized (lock) { + //Notify just one thread + lock.notify(); + } + waitOneFinished.await(); + List notRun = rp.shutdownNow(); + synchronized (lock) { + lock.notifyAll(); + } + boolean allFinished = true; + int finishedCount = 0; + for (R r : rs) { + assertTrue(r.hasStarted); + allFinished &= r.hasFinished; + if (r.hasFinished) { + finishedCount++; + } + } + assertFalse("All tasks should not have completed", allFinished); + assertTrue("At least one task shall complete", finishedCount >= 1); + assertFalse(notRun.isEmpty()); + assertTrue(rp.isShutdown()); + //Technically not provable due to "spurious wakeups" + // assertEquals (1, finishedCount); + + try { + RequestProcessor.getDefault().shutdown(); + fail("Should not be able to shutdown() default RP"); + } catch (Exception e) { + } + try { + RequestProcessor.getDefault().shutdownNow(); + fail("Should not be able to shutdownNow() default RP"); + } catch (Exception e) { + } + for (Runnable nr : notRun) { + assertTrue ("Shutdown is not returning submitted runnables - got a " + + nr.getClass().getName() + " instead of " + + R.class.getName(), nr.getClass() == R.class); + } + } + + public void testAwaitTermination() throws Exception { + int count = 20; + final Object lock = new Object(); + final CountDownLatch waitAllLaunched = new CountDownLatch(count); + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + class R implements Runnable { + + volatile boolean hasStarted; + volatile boolean hasFinished; + + @Override + public void run() { + hasStarted = true; + waitAllLaunched.countDown(); + synchronized (lock) { + try { + lock.wait(); + if (Thread.interrupted()) { + return; + } + } catch (InterruptedException ex) { + return; + } finally { + hasFinished = true; + waitAll.countDown(); + } + } + } + } + Set> s = new HashSet>(); + Set rs = new HashSet(); + for (int i = 0; i < count; i++) { + String currName = "Runnable " + i; + R r = new R(); + rs.add(r); + s.add(rp.submit(r, currName)); + } + waitAllLaunched.await(); + synchronized (lock) { + //Notify just one thread + lock.notifyAll(); + } + rp.shutdown(); + boolean awaitTermination = rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + assertTrue(awaitTermination); + assertTrue(rp.isShutdown()); + assertTrue(rp.isTerminated()); + } + + @RandomlyFails + public void testAwaitTerminationWaitsForNewlyAddedThreads() throws Exception { + final RequestProcessor rp = new RequestProcessor("testAwaitTerminationWaitsForNewlyAddedThreads", 50, false); + int count = 30; + final CountDownLatch waitLock = new CountDownLatch(1); + class R implements Runnable { + boolean done; + @Override + public void run() { + try { + waitLock.await(); + } catch (InterruptedException ex) { + done = true; + } finally { + done = true; + } + } + } + Set rs = new HashSet(); + for (int i= 0; i < count; i++) { + R r = new R(); + rs.add(r); + rp.submit(r); + } + final CountDownLatch shutdownBegun = new CountDownLatch(1); + Runnable shutdowner = new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + rp.shutdown(); + shutdownBegun.countDown(); + } catch (InterruptedException ex) { + Exceptions.printStackTrace(ex); + } + } + }; + waitLock.countDown(); + new Thread(shutdowner).start(); + assertTrue(rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + Thread.sleep (600); + assertTrue (rp.isTerminated()); + } + + public void testInvokeAll() throws Exception { + int count = 20; + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + try { + class C implements Callable { + + private final String result; + volatile boolean ran; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + ran = true; + waitAll.countDown(); + return result; + } + } + List callables = new ArrayList(count); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + callables.add(c); + } + fs = rp.invokeAll(callables); + + assertNotNull(fs); + waitAll.await(); + assertEquals(0, waitAll.getCount()); + for (Future f : fs) { + assertTrue (f.isDone()); + } + for (C c : callables) { + assertTrue (c.ran); + } + Set s = new HashSet(count); + for (Future f : fs) { + s.add(f.get()); + } + assertEquals(names, s); + } finally { + rp.stop(); + } + } + + public void testInvokeAllWithTimeout() throws Exception { + int count = 20; + final CountDownLatch blocker = new CountDownLatch(1); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + try { + class C implements Callable { + volatile boolean iAmSpecial; + + private final String result; + volatile boolean ran; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + //Only one will be allowed to run, the rest + //will be cancelled + if (!iAmSpecial) { + blocker.await(); + } + ran = true; + return result; + } + } + List callables = new ArrayList(count); + C special = new C("Special"); + special.iAmSpecial = true; + callables.add(special); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + callables.add(c); + } + fs = rp.invokeAll(callables, 1000, TimeUnit.MILLISECONDS); + assertNotNull(fs); + for (Future f : fs) { + assertTrue (f.isDone()); + } + for (C c : callables) { + if (c == special) { + assertTrue (c.ran); + } else { + assertFalse(c.ran); + } + } + } finally { + rp.stop(); + } + } + + public void testInvokeAllSingleThread() throws Exception { + int count = 20; + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", 1); + class C implements Callable { + + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + waitAll.countDown(); + return result; + } + } + List l = new ArrayList(count); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + fs = rp.invokeAll(l); + assertNotNull(fs); + Set s = new HashSet(count); + for (Future f : fs) { + s.add(f.get()); + } + assertEquals(names, s); + } + + public void testInvokeAny() throws Exception { + int count = 20; + final RequestProcessor rp = new RequestProcessor("TestRP", count + 1); + class C implements Callable { + + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + if (Thread.interrupted()) { + return null; + } + return result; + } + } + List l = new ArrayList(count); + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + String res = rp.invokeAny(l); + assertNotNull(res); + assertTrue(res.startsWith("R")); + } + + public void testInvokeAnySingleThread() throws Exception { + int count = 1000; + final RequestProcessor rp = new RequestProcessor("TestRP", 20); + final CountDownLatch latch = new CountDownLatch(count); + class C implements Callable { + + volatile boolean hasRun; + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + latch.countDown(); + if (!"R17".equals(result)) { + try { + //Block all but one thread until threads have entered + latch.await(); + } catch (InterruptedException ie) { + } + } + Thread.yield(); + hasRun = true; + return result; + } + } + List l = new ArrayList(count); + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + String res = rp.invokeAny(l); + assertNotNull(res); + assertTrue(res.startsWith("R")); + int runCount = 0; + for (C c : l) { + if (c.hasRun) { + runCount++; + } + } + assertTrue("Not all " + count + " threads should have completed, but " + runCount + " did.", runCount < count); + } + + public void testInvokeAnyWithTimeout() throws Exception { + int count = 20; + final RequestProcessor rp = new RequestProcessor("TestRP", count + 1); + final CountDownLatch latch = new CountDownLatch(1); + class C implements Callable { + + volatile boolean hasRun; + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + latch.await(); + if (Thread.interrupted()) { + return null; + } + hasRun = true; + return result; + } + } + List l = new ArrayList(count); + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + //All threads are waiting on latch; we should time out + String res = rp.invokeAny(l, 400, TimeUnit.MILLISECONDS); + assertNull(res); + for (C c : l) { + assertFalse(c.hasRun); + } + } + + public void testCancellation() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + class C implements Callable { + + volatile boolean hasRun; + volatile boolean interrupted; + + @Override + public String call() throws Exception { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + return null; + } + if (Thread.interrupted()) { + interrupted = true; + return null; + } + hasRun = true; + return "Hello"; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + latch.countDown(); + String s = null; + try { + s = f.get(); + fail("CancellationException should have been thrown"); + } catch (CancellationException e) { + } + assertNull(s); + assertTrue(c.interrupted || !c.hasRun); + assertFalse(c.hasRun); + } + + public void testCancellablesGetCancelInvokedWithCallable() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch exit = new CountDownLatch(1); + class C implements Callable, Cancellable { + + volatile boolean hasRun; + volatile boolean interrupted; + volatile boolean cancelled; + + @Override + public String call() throws Exception { + try { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + return null; + } + if (Thread.interrupted()) { + interrupted = true; + return null; + } + if (cancelled) { + return null; + } + hasRun = true; + return "Hello"; + } finally { + exit.countDown(); + } + } + + @Override + public boolean cancel() { + cancelled = true; + exit.countDown(); + return true; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + assertTrue (c.cancelled); + latch.countDown(); + exit.await(); + String s = null; + try { + s = f.get(); + fail ("Should have gotten cancellation exception"); + } catch (CancellationException e) { + + } + } + + public void testCancellablesGetCancelInvokedWithRunnable() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch exit = new CountDownLatch(1); + class C implements Runnable, Cancellable { + + volatile boolean hasRun; + volatile boolean interrupted; + volatile boolean cancelled; + + @Override + public void run() { + try { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + return; + } + if (Thread.interrupted()) { + interrupted = true; + return; + } + if (cancelled) { + return; + } + hasRun = true; + } finally { + exit.countDown(); + } + } + + @Override + public boolean cancel() { + cancelled = true; + exit.countDown(); + return true; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + assertTrue (c.cancelled); + latch.countDown(); + exit.await(); + try { + f.get(); + fail ("Should have gotten cancellation exception"); + } catch (CancellationException e) { + + } + assertFalse (c.hasRun); + } + + public void testCancellablesThatSayTheyCantBeCancelledAreNotCancelledViaFutureDotCancel() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch exit = new CountDownLatch(1); + class C implements Runnable, Cancellable { + + volatile boolean hasRun; + volatile boolean interrupted; + volatile boolean cancelCalled; + + @Override + public void run() { + try { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + throw new AssertionError(e); + } + if (Thread.interrupted()) { + interrupted = true; + throw new AssertionError("Thread should not have been interrupted"); + } + hasRun = true; + } finally { + exit.countDown(); + } + } + + @Override + public boolean cancel() { + cancelCalled = true; + return false; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + assertFalse (f.isCancelled()); + assertTrue (c.cancelCalled); + latch.countDown(); + exit.await(); + f.get(); + assertFalse (f.isCancelled()); + assertTrue (c.hasRun); + } + + public void testInvokeAllCancellation() throws Exception { + int count = 20; + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + class C implements Callable, Cancellable { + + private final String result; + volatile boolean cancelCalled; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + waitAll.countDown(); + return cancelCalled ? null : result; + } + + @Override + public boolean cancel() { + cancelCalled = true; + return false; + } + } + List l = new ArrayList(count); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + fs = rp.invokeAll(l); + assertNotNull(fs); + Set s = new HashSet(count); + for (Future f : fs) { + s.add(f.get()); + } + assertEquals(names, s); + } + + public void testCannotScheduleLongerThanIntegerMaxValue() throws Exception { + Runnable r = new Runnable() { + + @Override + public void run() { + fail ("Should not have been run"); + } + }; + try { + Future f = RequestProcessor.getDefault().schedule(r, Long.MAX_VALUE, TimeUnit.DAYS); + f.cancel(true); + } catch (Exception e) {} + } + + public void testCannotScheduleNegativeDelay() throws Exception { + Runnable r = new Runnable() { + + @Override + public void run() { + fail ("Should not have been run"); + } + }; + try { + RequestProcessor.getDefault().schedule(r, -1L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleAtFixedRate(r, -1L, 22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleAtFixedRate(r, 200, -22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleWithFixedDelay(r, -1L, 22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleWithFixedDelay(r, 1L, -22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + } + + public void testTaskCanRescheduleItself() throws Exception { + final CountDownLatch latch = new CountDownLatch(2); + class R implements Runnable { + volatile RequestProcessor.Task task; + volatile int runCount; + @Override + public void run() { + runCount++; + if (runCount == 1) { + task.schedule(0); + } + latch.countDown(); + } + } + R r = new R(); + RequestProcessor.Task t = RequestProcessor.getDefault().create(r); + r.task = t; + t.schedule(0); + latch.await (); + assertEquals (r.runCount, 2); + } + + public void testScheduleRepeatingSanityFixedRate() throws Exception { + final CountDownLatch latch = new CountDownLatch(5); + class C implements Runnable { + volatile int runCount; + @Override + public void run() { + runCount++; + latch.countDown(); + } + } + C c = new C(); + RequestProcessor.getDefault().scheduleWithFixedDelay(c, 0, 20, TimeUnit.MILLISECONDS); +// latch.await(5000, TimeUnit.MILLISECONDS); + latch.await(); + assertEquals (5, c.runCount); + } + + public void testScheduleRepeatingSanityFixedDelay() throws Exception { + final CountDownLatch latch = new CountDownLatch(5); + class C implements Runnable { + volatile int runCount; + @Override + public void run() { + runCount++; + latch.countDown(); + } + } + C c = new C(); + RequestProcessor.getDefault().scheduleAtFixedRate(c, 0, 20, TimeUnit.MILLISECONDS); + latch.await(2000, TimeUnit.MILLISECONDS); + + assertEquals (5, c.runCount); + } + + public void testScheduleOneShot() throws Exception { + RequestProcessor rp = new RequestProcessor ("testScheduleOneShot", 5, true, true); + try { + class C implements Callable { + volatile long start = System.currentTimeMillis(); + private volatile long end; + + @Override + public String call() throws Exception { + synchronized(this) { + end = System.currentTimeMillis(); + } + return "Hello"; + } + + synchronized long elapsed() { + return end - start; + } + } + C c = new C(); + int delay = 5000; + //Use a 20 second timeout to have a reasonable chance of accuracy + ScheduledFuture f = rp.schedule(c, delay, TimeUnit.MILLISECONDS); + assertEquals (5000, f.getDelay(TimeUnit.MILLISECONDS)); + assertNotNull(f.get()); + //Allow 4 seconds fudge-factor + assertTrue (c.elapsed() > 4600); + assertTrue (f.isDone()); + } finally { + rp.stop(); + } + } + + public void testScheduleRepeatingIntervalsAreRoughlyCorrect() throws Exception { + int runCount = 5; + final CountDownLatch latch = new CountDownLatch(runCount); + final List intervals = Collections.synchronizedList(new ArrayList (runCount)); +// long initialDelay = 30000; +// long period = 20000; +// long fudgeFactor = 4000; + long initialDelay = 3000; + long period = 2000; + long fudgeFactor = 400; + long expectedInitialDelay = initialDelay - fudgeFactor; + long expectedPeriod = period - fudgeFactor; + class C implements Runnable { + volatile long start = System.currentTimeMillis(); + private int runCount; + @Override + public void run() { + runCount++; + try { + synchronized(this) { + long end = System.currentTimeMillis(); + intervals.add (end - start); + start = end; + } + } finally { + latch.countDown(); + } + } + } + C c = new C(); + RequestProcessor rp = new RequestProcessor ("testScheduleRepeating", 5, true); + try { + Future f = rp.scheduleWithFixedDelay(c, initialDelay, period, TimeUnit.MILLISECONDS); + // latch.await(initialDelay + fudgeFactor + ((runCount - 1) * (period + fudgeFactor)), TimeUnit.MILLISECONDS); //XXX + latch.await(); + f.cancel(true); + for (int i= 0; i < Math.min(runCount, intervals.size()); i++) { + long expect = i == 0 ? expectedInitialDelay : expectedPeriod; + assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect); + } + //Ensure we have really exited + try { + f.get(); + fail ("CancellationException should have been thrown"); + } catch (CancellationException e) {} + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + } finally { + rp.stop(); + } + } + + public void testScheduleFixedRateAreRoughlyCorrect() throws Exception { + int runCount = 5; + final CountDownLatch latch = new CountDownLatch(runCount); + final List intervals = Collections.synchronizedList(new ArrayList (runCount)); +// long initialDelay = 30000; +// long period = 20000; +// long fudgeFactor = 4000; + long initialDelay = 3000; + long period = 2000; + long fudgeFactor = 400; + long expectedInitialDelay = initialDelay - fudgeFactor; + long expectedPeriod = period - fudgeFactor; + class C implements Runnable { + volatile long start = System.currentTimeMillis(); + private int runCount; + @Override + public void run() { + runCount++; + try { + synchronized(this) { + long end = System.currentTimeMillis(); + intervals.add (end - start); + start = end; + } + } finally { + latch.countDown(); + } + } + } + C c = new C(); + RequestProcessor rp = new RequestProcessor ("testScheduleFixedRateAreRoughlyCorrect", 5, true); + try { + Future f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); + latch.await(); + f.cancel(true); + for (int i= 0; i < Math.min(runCount, intervals.size()); i++) { + long expect = i == 0 ? expectedInitialDelay : expectedPeriod; + assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect); + } + //Ensure we have really exited + try { + f.get(); + fail ("CancellationException should have been thrown"); + } catch (CancellationException e) {} + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + } finally { + rp.stop(); + } + } + + public void testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution() throws Exception { + final AtomicInteger val = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(10); + class C implements Runnable { + boolean failed; + @Override + public void run() { + try { + int now = val.incrementAndGet(); + if (now > 1) { + failed = true; + fail (now + " threads simultaneously in run()"); + } + try { + //Intentionally sleep *longer* than the interval + //between executions. We *want* to pile up all of the + //RP threads entering run() - synchronization should + //serialize them. This test is to prove that this + //method will never be called concurrently from two threads + Thread.sleep(1000); + } catch (InterruptedException ex) { + + } + } finally { + val.decrementAndGet(); + latch.countDown(); + } + } + } + C c = new C(); + long initialDelay = 2000; + long period = 10; + RequestProcessor rp = new RequestProcessor("testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution", 10, true); + rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); + latch.await(); + assertFalse(c.failed); + rp.stop(); + } + + @RandomlyFails + public void testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed() throws Exception { + final CountDownLatch latch = new CountDownLatch(10); + final List intervals = new CopyOnWriteArrayList(); + class C implements Runnable { + long start = Long.MIN_VALUE; + + @Override + public void run() { + long end = System.currentTimeMillis(); + if (start != Long.MIN_VALUE) { + intervals.add(end - start); + } + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + + } + start = System.currentTimeMillis(); + latch.countDown(); + } + } + C c = new C(); + long initialDelay = 100; + long period = 100; + RequestProcessor rp = new RequestProcessor("testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed", 10, true); + ScheduledFuture f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); + latch.await(); + f.cancel(true); + rp.stop(); + int max = intervals.size(); + for (int i= 0; i < max; i++) { + long iv = intervals.get(i); + assertFalse ("Interval " + i + " should have been at least less than requested interval * 1.5 with fixed rate" + iv, iv > 150); + } + } + + public void testCancelFutureInterruptsThreadEvenIfRequestProcessorForbidsIt() throws Exception { + RequestProcessor rp = new RequestProcessor ("X", 3, false, true); + final CountDownLatch releaseForRun = new CountDownLatch(1); + final CountDownLatch enterLatch = new CountDownLatch(1); + final CountDownLatch exitLatch = new CountDownLatch(1); + class R implements Runnable { + volatile boolean interrupted; + @Override + public void run() { + enterLatch.countDown(); + try { + releaseForRun.await(); + } catch (InterruptedException ex) { + interrupted = true; + } + interrupted |= Thread.interrupted(); + exitLatch.countDown(); + } + } + R r = new R(); + Future f = rp.submit(r); + enterLatch.await(); + f.cancel(true); + assertTrue (f.isCancelled()); + exitLatch.await(); + assertTrue (r.interrupted); + } + + public void testCancelDoesNotInterruptIfNotPassedToFutureDotCancel() throws Exception { + RequestProcessor rp = new RequestProcessor ("X", 3, false, true); + final CountDownLatch releaseForRun = new CountDownLatch(1); + final CountDownLatch enterLatch = new CountDownLatch(1); + final CountDownLatch exitLatch = new CountDownLatch(1); + class R implements Runnable { + volatile boolean interrupted; + @Override + public void run() { + enterLatch.countDown(); + try { + releaseForRun.await(); + } catch (InterruptedException ex) { + interrupted = true; + } + interrupted |= Thread.interrupted(); + exitLatch.countDown(); + } + } + R r = new R(); + Future f = rp.submit(r); + enterLatch.await(); + f.cancel(false); + assertTrue (f.isCancelled()); + assertFalse (r.interrupted); + } + + public void testCancelDoesInterruptIfRequestProcessorSpecifiesItEvenIfFalsePassedToFutureDotCancel() throws Exception { + RequestProcessor rp = new RequestProcessor ("X", 3, true, true); + final CountDownLatch releaseForRun = new CountDownLatch(1); + final CountDownLatch enterLatch = new CountDownLatch(1); + final CountDownLatch exitLatch = new CountDownLatch(1); + class R implements Runnable { + volatile boolean interrupted; + @Override + public void run() { + enterLatch.countDown(); + try { + releaseForRun.await(); + } catch (InterruptedException ex) { + interrupted = true; + } + interrupted |= Thread.interrupted(); + exitLatch.countDown(); + } + } + R r = new R(); + Future f = rp.submit(r); + enterLatch.await(); + f.cancel(false); + assertTrue (f.isCancelled()); + exitLatch.await(); + assertTrue (r.interrupted); + } + + public void testReschedulable() throws Exception { + final int count = 25; + final CountDownLatch latch = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("testReschedulable", 5, true); + final AtomicBoolean reschedulerLock = new AtomicBoolean(false); + final AtomicBoolean runExit = new AtomicBoolean(false); + Thread rescheduler = null; + try { + class R implements Runnable { + volatile ReschedulableFuture f; + volatile int runCount; + @Override + public void run() { + runCount++; + latch.countDown(); + if (latch.getCount() > 0) { + reschedulerLock.set(true); + } + runExit.set(true); + } + } + final R r = new R(); + r.f = rp.createReschedulable(r); + assertFalse(r.f.isCancelled()); + assertFalse(r.f.isDone()); + + class Rescheduler implements Runnable { + @Override + public void run() { + //Intentionally use a busywait, not a lock - more overhead, + //but more robust + for(;;) { + try { + Thread.sleep(200); + boolean go = reschedulerLock.get(); + if (go) { + reschedulerLock.set(false); + //ensure the previous run has really exited + do { + Thread.sleep(10); + } while (!runExit.get()); + Thread.sleep(20); + runExit.set(false); + r.f.schedule(10); + } + } catch (InterruptedException ex) { + return; + } + } + } + } + rescheduler = new Thread(new Rescheduler(), "testReschedulable Rescheduler thread"); + rescheduler.start(); + reschedulerLock.set(true); + r.f.schedule(10); + latch.await(); + assertTrue (latch.getCount() == 0); + assertTrue (r.runCount == count); + assertFalse(r.f.isCancelled()); + assertTrue(r.f.isDone()); + assertTrue(runExit.get()); + } finally { + rp.stop(); + if (rescheduler != null) { + rescheduler.interrupt(); + } + } + } + + public void testReschedulableGet() throws Exception { + //Yes, the busywaits are ugly. However, they are locking-bug-proof - + //we want to be testing our code that deals with complex inter-thread + //locking - add more locking complexity and what are we testing? + final int count = 25; + final CountDownLatch latch = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("testReschedulable", 1, true); + final AtomicBoolean reschedulerLock = new AtomicBoolean(false); + final AtomicBoolean runExit = new AtomicBoolean(false); + final AtomicBoolean running = new AtomicBoolean(true); + final AtomicBoolean suspend = new AtomicBoolean(false); + final AtomicBoolean waitForSuspend = new AtomicBoolean(false); + Thread rescheduler = null; + try { + class R implements Runnable { + volatile ReschedulableFuture f; + volatile int runCount; + @Override + public void run() { + runCount++; + if (runCount == 20) { + waitForSuspend.set(true); + while (suspend.get()) { + try { + Thread.sleep(200); + } catch (InterruptedException ex) { + + } + } + } + latch.countDown(); + if (latch.getCount() > 0) { + reschedulerLock.set(true); + running.set(true); + } else { + running.set(false); + } + runExit.set(true); + } + } + final R r = new R(); + r.f = rp.createReschedulable(r); + assertFalse(r.f.isCancelled()); + assertFalse(r.f.isDone()); + + class Rescheduler implements Runnable { + @Override + public void run() { + //Intentionally use a busywait, not a lock - more overhead, + //but more robust + for(;;) { + try { + Thread.sleep(200); + boolean go = reschedulerLock.get(); + if (go) { + reschedulerLock.set(false); + //ensure the previous run has really exited + do { + Thread.sleep(10); + } while (!runExit.get()); + Thread.sleep(20); + runExit.set(false); + r.f.schedule(10); + } + } catch (InterruptedException ex) { + return; + } + } + } + } + rescheduler = new Thread(new Rescheduler(), "testReschedulableGet Rescheduler thread"); + rescheduler.start(); + reschedulerLock.set(true); + suspend.set(true); + r.f.schedule(10); + r.f.get(); + while (!waitForSuspend.get()); + assertTrue(running.get()); + assertTrue ("get() should have blocked for at least one run", latch.getCount() < count); + assertTrue (r.runCount > 0); + assertTrue (r.runCount < count); + assertFalse(r.f.isCancelled()); + assertFalse(r.f.isDone()); + suspend.set(false); + + } finally { + rp.stop(); + if (rescheduler != null) { + rescheduler.interrupt(); + } + } + } + + public void testReschedulableCancel() throws Exception { + //Yes, the busywaits are ugly. However, they are locking-bug-proof - + //we want to be testing our code that deals with complex inter-thread + //locking - add more locking complexity and what are we testing? + final int count = 25; + final CountDownLatch latch = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("testReschedulable", 1, true); + final AtomicBoolean reschedulerLock = new AtomicBoolean(false); + final AtomicBoolean runExit = new AtomicBoolean(false); + final AtomicBoolean running = new AtomicBoolean(true); + final AtomicBoolean suspend = new AtomicBoolean(false); + final AtomicBoolean waitForSuspend = new AtomicBoolean(false); + final Object lock = new Object(); + Thread rescheduler = null; + try { + class R implements Runnable { + volatile ReschedulableFuture f; + volatile int runCount; + volatile boolean interrupted; + @Override + public void run() { + runCount++; + if (runCount == 20) { + waitForSuspend.set(true); + while (suspend.get()) { + try { + Thread.sleep(200); + } catch (InterruptedException ex) { + interrupted = true; + running.set(false); + } + } + } + latch.countDown(); + if (latch.getCount() > 0) { + reschedulerLock.set(true); + running.set(true); + } else { + running.set(false); + } + runExit.set(true); + synchronized (lock) { + lock.notifyAll(); + } + } + } + final R r = new R(); + r.f = rp.createReschedulable(r); + assertFalse(r.f.isCancelled()); + assertFalse(r.f.isDone()); + + class Rescheduler implements Runnable { + @Override + public void run() { + //Intentionally use a busywait, not a lock - more overhead, + //but more robust + for(;;) { + try { + Thread.sleep(200); + boolean go = reschedulerLock.get(); + if (go) { + reschedulerLock.set(false); + //ensure the previous run has really exited + do { + Thread.sleep(10); + } while (!runExit.get()); + Thread.sleep(20); + runExit.set(false); + r.f.schedule(10); + } + } catch (InterruptedException ex) { + return; + } + } + } + } + rescheduler = new Thread(new Rescheduler(), "testReschedulableGet Rescheduler thread"); + rescheduler.start(); + reschedulerLock.set(true); + suspend.set(true); + r.f.schedule(10); + while (!waitForSuspend.get()); + r.f.cancel(true); + while (running.get()); + assertTrue (r.interrupted); + + assertFalse(running.get()); + assertTrue (r.runCount > 0); + assertTrue (r.runCount < count); + assertTrue(r.f.isCancelled()); + assertTrue(r.f.isDone()); + suspend.set(false); + rescheduler.stop(); + runExit.set(false); + r.f.schedule(100); + assertFalse (r.f.isCancelled()); + assertFalse (r.f.isDone()); + synchronized (lock) { + lock.wait(); + } + Thread.sleep (1000); + assertFalse (r.f.isCancelled()); + assertTrue (r.f.isDone()); + } finally { + rp.stop(); + if (rescheduler != null && rescheduler.isAlive()) { + rescheduler.interrupt(); + } + } + } + + static void wait (AtomicBoolean val) { + while (!val.get()) { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + return; + } + } + val.set(false); + } + + public void testReschedulableCancelAndRestart() throws Exception { + final AtomicBoolean runReleased = new AtomicBoolean(false); + final AtomicBoolean inRun = new AtomicBoolean(false); + class R implements Runnable { + @Override + public void run() { + inRun.set(true); + try { + wait (runReleased); + } finally { + inRun.set(false); + } + } + } + R r = new R(); + RequestProcessor rp = new RequestProcessor("testReschedulableCancelSimple", 5, true); + ReschedulableFuture f = rp.createReschedulable(r, true); + assertTrue (f.isDone()); + assertFalse (f.isCancelled()); + f.schedule (100); + wait (inRun); + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + runReleased.set(true); + f.get(); + assertFalse(f.isCancelled()); + assertTrue (f.isDone()); + + f.schedule(100); + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + f.cancel(true); + assertTrue (f.isCancelled()); + assertTrue (f.isDone()); + f.schedule(100); + assertFalse (f.isCancelled()); + wait(inRun); + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + rp.stop(); + } +}