diff --git a/openide.util/manifest.mf b/openide.util/manifest.mf --- a/openide.util/manifest.mf +++ b/openide.util/manifest.mf @@ -1,5 +1,5 @@ Manifest-Version: 1.0 OpenIDE-Module: org.openide.util OpenIDE-Module-Localizing-Bundle: org/openide/util/Bundle.properties -OpenIDE-Module-Specification-Version: 8.1 +OpenIDE-Module-Specification-Version: 8.2 diff --git a/openide.util/src/org/openide/util/RequestProcessor.java b/openide.util/src/org/openide/util/RequestProcessor.java --- a/openide.util/src/org/openide/util/RequestProcessor.java +++ b/openide.util/src/org/openide/util/RequestProcessor.java @@ -41,15 +41,33 @@ package org.openide.util; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; +import java.util.Set; import java.util.Stack; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -157,7 +175,7 @@ * * @author Petr Nejedly, Jaroslav Tulach */ -public final class RequestProcessor implements Executor { +public final class RequestProcessor implements Executor, ScheduledExecutorService { /** the static instance for users that do not want to have own processor */ private static RequestProcessor DEFAULT = new RequestProcessor(); @@ -186,7 +204,7 @@ /** If the RP was stopped, this variable will be set, every new post() * will throw an exception and no task will be processed any further */ - boolean stopped = false; + volatile boolean stopped = false; /** The lock covering following four fields. They should be accessed * only while having this lock held. */ @@ -585,6 +603,789 @@ } } + /** + * Shut down this thread pool as defined in + * {@link java.util.concurrent.ExecutorService#shutdown()}.

If this method + * is called on the default RequestProcessor, + * throws an IllegalStateException. + * @since 8.2 + */ + @Override + public void shutdown() { + if (this == UNLIMITED) { + throw new IllegalStateException ("Cannot shut down the default " + //NOI18N + "request processor"); //NOI18N + } + stop(); + } + + /** + * Shut down this thread pool as defined in + * {@link java.util.concurrent.ExecutorService#shutdownNow()}, returning + * a list of un-run or unfinished runnables.

If this method + * is called on the default RequestProcessor, + * throws an IllegalStateException. + * @since 8.2 + */ + @Override + public List shutdownNow() { + if (this == UNLIMITED) { + throw new IllegalStateException ("Cannot shut down the default " + //NOI18N + "request processor"); //NOI18N + } + //XXX more aggressive shutdown? + stop(); + synchronized (processorLock) { + List result = new ArrayList(processors.size()); + for (Processor p : processors) { + if (p != null && p.todo != null && p.todo.run != null) { + Runnable r = p.todo.run; + if (r instanceof RunnableWrapper) { + Runnable other = ((RunnableWrapper) r).getRunnable(); + r = other == null ? r : other; + } + result.add(r); + } + } + return result; + } + } + + /** + * Determine if this thread pool has been shut down, in accordance with + * the specification of {@link java.util.concurrent.ExecutorService#shutdown()} + * @return + * @since 8.2 + */ + @Override + public boolean isShutdown() { + return stopped; + } + + /** + * Determine if this thread pool has been terminated, in accordance with + * the specification of {@link java.util.concurrent.ExecutorService#isTerminated()} + * @return + * @since 8.2 + */ + @Override + public boolean isTerminated() { + boolean result = true; + Set set = collectProcessors(new HashSet()); + for (Processor p : set) { + if (p.isAlive() && p.belongsTo(this)) { + result = false; + break; + } + } + return result; + } + + /** + * Determine if this thread pool has been shut down, in accordance with + * the specification of {@link java.util.concurrent.ExecutorService#shutdown()} + * @return True if the request processor is shut down before the timeout + * @since 8.2 + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + Parameters.notNull("unit", unit); //NOI18N + long timeoutMillis = unit.convert(timeout, TimeUnit.MILLISECONDS); + boolean result = stopped; + long doneTime = System.currentTimeMillis() + timeoutMillis; + Set procs = new HashSet(); +outer: do { + procs = collectProcessors(procs); + if (procs.isEmpty()) { + return true; + } + for (Processor p : procs) { + long remaining = doneTime - System.currentTimeMillis(); + if (remaining <= 0) { + result = collectProcessors(procs).isEmpty(); + break outer; + } + if (p.belongsTo(this)) { + p.join(remaining); + } + result = !p.isAlive() || !p.belongsTo(this); + } + procs.clear(); + } while (!procs.isEmpty()); + return result; + } + + private Set collectProcessors (Set procs) { + procs.clear(); + synchronized (processorLock) { + for (Processor p : processors) { + if (p.belongsTo(this)) { + procs.add(p); + } + } + } + return procs; + } + + /** + * Submit a job to be run, and get back a {@link java.util.concurrent.Future} + * which can be waited on and asked for a result, following the contract of + * {@link java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable)} + *

+ * Note: If the passed {@link java.util.concurrent.Callable} implements + * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} + * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. + * If Cancellable.cancel() returns false, then the job will not be + * cancelled. + * + * @param The return type of the passed job + * @param task The work to do + * @return A Future that can indicate work status and fetch a result + * @throws RejectedExecutionException if this RequestProcessor has been + * shut down + * @since 8.2 + */ + @Override + public Future submit(Callable task) { + Parameters.notNull("task", task); //NOI18N + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + RPFutureTask result = new RPFutureTask(task); + Task t = create(result); + result.setTask(t); + t.schedule(0); + return result; + } + /** + * Submit a job to be run, and get back a {@link java.util.concurrent.Future} + * which can be waited on and asked for a result, following the contract of + * {@link java.util.concurrent.ExecutorService#submit(java.lang.Runnable, T)} + *

+ * Note: If the passed {@link java.lang.Runnable} implements + * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} + * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. + * If Cancellable.cancel() returns false, then the job will not be + * cancelled. + * + * @param The return type of the passed job + * @param task The work to do + * @param predefinedResult The return value for the resulting Future's + * get() method + * @return A Future that can indicate work status and fetch the result + * @throws RejectedExecutionException if this RequestProcessor has been + * shut down + * @since 8.2 + */ + @Override + public Future submit(Runnable task, T predefinedResult) { + Parameters.notNull("task", task); //NOI18N + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + RPFutureTask result = new RPFutureTask(task, predefinedResult); + Task t = create(result); + result.setTask(t); + t.schedule(0); + return result; + } + + /** + * Submit a job to be run, and get back a {@link java.util.concurrent.Future} + * which can be waited on and asked for a result, following the contract of + * {@link java.util.concurrent.ExecutorService#submit(java.lang.Runnable)} + *

+ * Note: If the passed {@link java.lang.Runnable} implements + * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} + * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. + * If Cancellable.cancel() returns false, then the job will not be + * cancelled. + * + * @param The return type of the passed job + * @param task The work to do + * @return A Future that can indicate work status and fetch a result + * @throws RejectedExecutionException if this RequestProcessor has been + * shut down + * @since 8.2 + */ + @Override + public Future submit(Runnable task) { + return this.submit (task, null); + } + + /** + * Invoke a collection of tasks, as defined in the specification of + * {@link java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)} + * @param The type of returned object + * @param tasks A collection of Callables with the same return type + * @return A list of futures which can be monitored for completion + * @throws InterruptedException If the timeout expires + * @since 8.2 + */ + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + Parameters.notNull("tasks", tasks); //NOI18N + List> result = new ArrayList>(tasks.size()); + CountDownLatch wait = new CountDownLatch(tasks.size()); + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + //NOI18N + tasks); + } + Callable delegate = new WaitableCallable(c, wait); + result.add (submit(delegate)); + } + wait.await(); + return result; + } + + /** + * Executes the given tasks, returning a list of Futures holding their + * status and results when all complete or the timeout expires, whichever + * happens first, as specified in + * {@link java.util.concurrent.ExecutorService#invokeAll((java.util.Collection,long,java.util.concurrent.TimeUnit))} + * @param The result type + * @param tasks A collection of callables + * @param timeout The maximum time to wait for completion, in the specified time units + * @param unit The time unit + * @return A list of futures + * @throws InterruptedException if the timeout expires + * @since 8.2 + */ + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("tasks", tasks); //NOI18N + CountDownLatch wait = new CountDownLatch(tasks.size()); + List> result = new ArrayList>(tasks.size()); + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + tasks); //NOI18N + } + Callable delegate = new WaitableCallable(c, wait); + result.add (submit(delegate)); + } + if (!wait.await(timeout, unit)) { + for (Future f : result) { + RPFutureTask ft = (RPFutureTask) f; + ft.cancel(true); + } + } + return result; + } + /** + * Executes the given tasks, returning the result of one which has + * completed and cancelling any incomplete tasks, as specified in + * {@link java.util.concurrent.ExecutorService#invokeAny((java.util.Collection))} + * @param The result type + * @param tasks A collection of callables + * @return A list of futures + * @throws InterruptedException if execution is interrupted + * @since 8.2 + */ + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + Parameters.notNull("tasks", tasks); //NOI18N + CountDownLatch wait = new CountDownLatch(1); + List> result = new ArrayList>(tasks.size()); + AtomicReference ref = new AtomicReference(); + try { + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + //NOI18N + tasks); + } + Callable delegate = new WaitableCallable(c, ref, wait); + result.add (submit(delegate)); + } + wait.await(); + } finally { + for (Future f : result) { + RPFutureTask ft = (RPFutureTask) f; + ft.cancel(true); + } + } + return ref.get(); + } + /** + * Executes the given tasks, returning a list of Futures holding their + * status and results when all complete or the timeout expires, whichever + * happens first, as specified in + * {@link java.util.concurrent.ExecutorService#invokeAny((java.util.Collection))} + * @param The result type + * @param tasks A collection of callables + * @param timeout The maximum time to wait for completion, in the specified time units + * @param unit The time unit + * @return A list of futures + * @throws InterruptedException if the timeout expires or execution is interrupted + * @since 8.2 + */ + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("tasks", tasks); //NOI18N + CountDownLatch wait = new CountDownLatch(1); + List> result = new ArrayList>(tasks.size()); + AtomicReference ref = new AtomicReference(); + try { + for (Callable c : tasks) { + if (c == null) { + throw new NullPointerException ("Contains null tasks: " + //NOI18N + tasks); + } + Callable delegate = new WaitableCallable(c, ref, wait); + result.add (submit(delegate)); + } + wait.await(timeout, unit); + } finally { + for (Future f : result) { + RPFutureTask ft = (RPFutureTask) f; + ft.cancel(true); + } + } + return ref.get(); + } + + /** + * Schedule a runnable to execute after the specified delay. + * @param command The runnable + * @param delay The delay + * @param unit The time units of the delay + * @return A future which can be monitored for completion + * @since 8.2 + */ + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + Parameters.notNull("command", command); //NOI18N + Parameters.notNull("unit", unit); //NOI18N + if (delay < 0) { + throw new IllegalArgumentException ("Negative delay: " + delay); + } + if (stopped) { + throw new RejectedExecutionException("Request Processor already stopped"); //NOI18N + } + long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS); + if (delayMillis > Integer.MAX_VALUE) { + throw new IllegalArgumentException ("Requested delay " + delayMillis + //NOI18N + " is > Integer.MAX_VALUE"); //NOI18N + } + ScheduledRPFutureTask result = new ScheduledRPFutureTask(command, null, delayMillis); + Task t = create(result); + result.setTask(t); + t.schedule((int) delayMillis); + return result; + } + /** + * Schedule a {@link java.util.concurrent.Callable} to execute on another thread + * after the specified delay. + * @param command The work to run + * @param delay The delay + * @param unit The time units of the delay + * @return A future which can be monitored for completion + * @since 8.2 + */ + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("callable", callable); //NOI18N + if (delay < 0) { + throw new IllegalArgumentException ("Negative delay: " + delay); + } + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS); + if (delayMillis > Integer.MAX_VALUE) { + throw new IllegalArgumentException ("Requested delay " + //NOI18N + delayMillis + " is > Integer.MAX_VALUE"); //NOI18N + } + ScheduledRPFutureTask result = new ScheduledRPFutureTask(callable, delayMillis); + Task t = create(result); + result.setTask(t); + t.schedule((int) delayMillis); + return result; + } + + /** + * Schedule a runnable which will run with a given frequency, regardless + * of how long execution takes, with the exception that if execution takes + * longer than the specified delay, execution will be delayed but will + * never be run on two threads concurrently. + * As specified in + * {@link java.util.concurrent.ExecutorService.scheduleAtFixedRate(java.lang.Runnable,long,long,java.util.concurrent.TimeUnit)} + * @param command The runnable + * @param initialDelay The delay before first running + * @param period The frequency with which the runnable should run after the + * first run + * @param unit The time units in which the initial delay and period are + * specified + * @return A future which can be monitored for completion, or cancelled + * @since 8.2 + */ + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return scheduleFixed(command, initialDelay, period, unit, false); + } + + /** + * Schedule a runnable which will run repeatedly after the specified initial + * delay, with the specified delay between the completion of one run and + * the start of the next. + * As specified in + * {@link java.util.concurrent.ExecutorService.scheduleWithFixedDelay(java.lang.Runnable,long,long,java.util.concurrent.TimeUnit)} + * @param command The runnable + * @param initialDelay The delay before first run + * @param delay The delay between runs + * @param unit The time units of the delay parameters + * @return A future which can be monitored for completion, or cancelled + * @since 8.2 + */ + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduleFixed(command, initialDelay, delay, unit, true); + } + + private ScheduledFuture scheduleFixed (Runnable command, long initialDelay, long period, TimeUnit unit, boolean fixedDelay) { + Parameters.notNull("unit", unit); //NOI18N + Parameters.notNull("command", command); //NOI18N + if (period < 0) { + throw new IllegalArgumentException ("Negative delay: " + period); //NOI18N + } + if (initialDelay < 0) { + throw new IllegalArgumentException ("Negative initialDelay: " //NOI18N + + initialDelay); + } + if (stopped) { + throw new RejectedExecutionException("Request Processor already " + //NOI18N + "stopped"); //NOI18N + } + long initialDelayMillis = unit.convert(initialDelay, TimeUnit.MILLISECONDS); + if (initialDelayMillis > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Initial delay > " + //NOI18N + "Integer.MAX_VALUE milliseconds: " + initialDelayMillis); //NOI18N + } + long periodMillis = unit.convert(period, TimeUnit.MILLISECONDS); + if (periodMillis > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Initial delay > " + //NOI18N + "Integer.MAX_VALUE milliseconds: " + periodMillis); //NOI18N + } + + TaskFutureWrapper wrap = fixedDelay ? + new FixedDelayTask(command, initialDelayMillis, periodMillis) : + new FixedRateTask(command, initialDelay, periodMillis); + Task t = create(wrap); + wrap.t = t; + t.cancelled = wrap.cancelled; + t.schedule ((int) initialDelayMillis); + + return wrap; + } + + private static abstract class TaskFutureWrapper implements ScheduledFuture, Runnable, RunnableWrapper { + volatile Task t; + protected final Runnable toRun; + protected final long initialDelay; + protected final long period; + final AtomicBoolean cancelled = new AtomicBoolean(); + TaskFutureWrapper(Runnable run, long initialDelay, long period) { + this.toRun = run; + this.initialDelay = initialDelay; + this.period = period; + } + + @Override + public final Runnable getRunnable() { + return toRun; + } + + @Override + public int compareTo(Delayed o) { + long other = o.getDelay(TimeUnit.MILLISECONDS); + long ours = getDelay(TimeUnit.MILLISECONDS); + //Might overflow on, say, ms compared to Long.MAX_VALUE, TimeUnit.DAYS + return (int) (ours - other); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = true; + if (toRun instanceof Cancellable) { + result = ((Cancellable) toRun).cancel(); + } + if (result) { + //will invoke cancelled.set(true) + result = t.cancel(mayInterruptIfRunning); + } + return result; + } + + @Override + public boolean isCancelled() { + return cancelled.get(); + } + + @Override + public boolean isDone() { + return cancelled.get() || t.isFinished(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + if (cancelled.get()) { + throw new CancellationException(); + } + t.waitFinished(); + if (cancelled.get()) { + throw new CancellationException(); + } + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (cancelled.get()) { + throw new CancellationException(); + } + long millis = unit.convert(timeout, TimeUnit.MILLISECONDS); + t.waitFinished(millis); + if (cancelled.get()) { + throw new CancellationException(); + } + return null; + } + } + + private static final class FixedRateTask extends TaskFutureWrapper { + private final Object runLock = new Object(); + private final Object timeLock = new Object(); + //must be accessed holding timeLock + private int runCount; + private long nextRunTime; + private long start = Long.MIN_VALUE; + volatile boolean firstRun = true; + FixedRateTask (Runnable run, long initialDelay, long period) { + super (run, initialDelay, period); + } + + @Override + public void run() { + if (firstRun) { + synchronized (timeLock) { + start = System.currentTimeMillis(); + firstRun = false; + } + } + try { + synchronized(runLock) { + toRun.run(); + } + } catch (RuntimeException e) { + cancel(true); + throw e; + } + reschedule(); + } + + private void reschedule() { + //All access to nextRunTime & runCount under lock. + long interval; + synchronized (timeLock) { + nextRunTime = start + (initialDelay + period * runCount); + runCount++; + interval = Math.max(0, nextRunTime - System.currentTimeMillis()); + if (interval > Integer.MAX_VALUE) { + throw new IllegalStateException ("Interval > Integer.MAX_VALUE: " + interval); //NOI18N + } + } + boolean canContinue = !cancelled.get() && !Thread.currentThread().isInterrupted(); + if (canContinue) { + t.schedule((int) interval); + } + } + + @Override + public long getDelay(TimeUnit unit) { + if (isCancelled()) { + return Long.MAX_VALUE; + } + long delay; + synchronized (timeLock) { + delay = Math.min(0, nextRunTime - System.currentTimeMillis()); + } + return unit.convert(delay, TimeUnit.MILLISECONDS); + } + } + + private static final class FixedDelayTask extends TaskFutureWrapper { + private volatile boolean firstRun = true; + private final AtomicLong nextRunTime = new AtomicLong(); + FixedDelayTask(Runnable run, long initialDelay, long period) { + super (run, initialDelay, period); + } + + @Override + public long getDelay(TimeUnit unit) { + long next = nextRunTime.get(); + return unit.convert (next - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + if (!fini()) { + toRun.run(); + } + if (!fini()) { + reschedule(); + } + } + + private boolean fini() { + boolean result = cancelled.get() || Thread.currentThread().isInterrupted(); + return result; + } + + private void reschedule() { + long delay; + if (firstRun) { + delay = initialDelay; + } else { + delay = period; + } + nextRunTime.set(System.currentTimeMillis() + delay); + firstRun = false; + if (!fini()) { + t.schedule((int) delay); + } + } + } + + private interface RunnableWrapper { + Runnable getRunnable(); + } + + private static final class WaitableCallable implements Callable, Cancellable { + private final CountDownLatch countdown; + private final Callable delegate; + private final AtomicReference ref; + private volatile boolean failed; + WaitableCallable(Callable delegate, CountDownLatch countdown) { + this (delegate, null, countdown); + } + + WaitableCallable(Callable delegate, AtomicReference ref, CountDownLatch countdown) { + this.delegate = delegate; + this.countdown = countdown; + this.ref = ref; + } + + boolean failed() { + return failed; + } + + @Override + public T call() throws Exception { + try { + T result = delegate.call(); + if (ref != null) { + ref.set(result); + } + return result; + } catch (RuntimeException e) { + failed = true; + throw e; + } catch (Error e) { + failed = true; + throw e; + } finally { + if (!failed || ref == null) { + countdown.countDown(); + } + } + } + + @Override + public boolean cancel() { + return delegate instanceof Cancellable ? ((Cancellable) delegate).cancel() : true; + } + } + + private static class RPFutureTask extends FutureTask implements RunnableWrapper { + protected volatile Task task; + private final Runnable runnable; + private final Cancellable cancellable; + RPFutureTask(Callable c) { + super (c); + this.runnable = null; + this.cancellable = c instanceof Cancellable ? (Cancellable) c : null; + } + + RPFutureTask(Runnable r, T result) { + super (r, result); + this.runnable = r; + this.cancellable = r instanceof Cancellable ? (Cancellable) r : null; + } + + void setTask(Task task) { + this.task = task; + } + + RPFutureTask(Callable c, T predefinedResult) { + this (c); + set(predefinedResult); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (cancellable != null) { + boolean result = cancellable.cancel(); + if (result) { + //note & not && - task.cancel() and super.cancel() must be invoked, + //if and only if the underlying cancellable is really null or + //returned true from cancel(). + return task.cancel() & super.cancel(mayInterruptIfRunning); + } else { + return result; + } + } else { + return task.cancel() & super.cancel(mayInterruptIfRunning); + } + } + + @Override + public Runnable getRunnable() { + return this.runnable; + } + } + + private static final class ScheduledRPFutureTask extends RPFutureTask implements ScheduledFuture { + protected final long delayMillis; + ScheduledRPFutureTask(Callable c, long delayMillis) { + super (c); + this.delayMillis = delayMillis; + } + + ScheduledRPFutureTask(Runnable r, T result, long delayMillis) { + super (r, result); + this.delayMillis = delayMillis; + } + + + @Override + public long getDelay(TimeUnit unit) { + return TimeUnit.MILLISECONDS.convert(delayMillis, unit); + } + + @Override + public int compareTo(Delayed o) { + //Can overflow, if one delay is, say, days, and the other, microseconds + long otherDelayMillis = o.getDelay(TimeUnit.MILLISECONDS); + return (int) (delayMillis - otherDelayMillis); + } + } + private class EnqueueTask extends TimerTask { Item itm; @@ -610,6 +1411,7 @@ private int priority = Thread.MIN_PRIORITY; private long time = 0; private Thread lastThread = null; + private AtomicBoolean cancelled; /** @param run runnable to start * @param delay amount of millis to wait @@ -685,6 +1487,9 @@ final Item localItem; synchronized (processorLock) { + if (cancelled != null) { + cancelled.set(false); + } notifyRunning(); if (item != null) { @@ -747,6 +1552,49 @@ } } + /** + * Implementation of cancel for use with Future objects, to guarantee + * that the thread will be interrupted, no matter what the setting + * on the owning RP. + * @param interrupt If true, the thread should be interrupted + * @return true if cancellation occurred + */ + boolean cancel (boolean interrupt) { + synchronized (processorLock) { + if (cancelled != null) { + boolean wasCancelled = !cancelled.getAndSet(true); + if (wasCancelled) { + return false; + } + } + boolean success; + + if (item == null) { + success = false; + } else { + Processor p = item.getProcessor(); + success = item.clear(null); + + if (p != null) { + if (interrupt) { + success = p.interrupt(this, RequestProcessor.this); + } else { + //despite its name, will not actually interrupt + //unless the RP specifies that it should + p.interruptTask(this, RequestProcessor.this); + } + if (success) { + item = null; + } + } + } + if (success) { + notifyFinished(); // mark it as finished + } + return success; + } + } + /** Current priority of the task. * @return the priority level (see e.g. {@link Thread#NORM_PRIORITY} */ @@ -1062,6 +1910,12 @@ } } + boolean belongsTo(RequestProcessor r) { + synchronized (lock) { + return source == r; + } + } + /** * The method that will repeatedly wait for a request and perform it. */ @@ -1190,6 +2044,14 @@ } } + boolean interrupt (Task t, RequestProcessor src) { + if (t != todo) { + return false; + } + interrupt(); + return true; + } + /** @see "#20467" */ private static void doNotify(RequestProcessor.Task todo, Throwable ex) { if (SLOW && todo.item.message == null) { diff --git a/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java b/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java new file mode 100644 --- /dev/null +++ b/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java @@ -0,0 +1,1176 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2010 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common + * Development and Distribution License("CDDL") (collectively, the + * "License"). You may not use this file except in compliance with the + * License. You can obtain a copy of the License at + * http://www.netbeans.org/cddl-gplv2.html + * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the + * specific language governing permissions and limitations under the + * License. When distributing the software, include this License Header + * Notice in each file and include the License file at + * nbbuild/licenses/CDDL-GPL-2-CP. Sun designates this + * particular file as subject to the "Classpath" exception as provided + * by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the + * License Header, with the fields enclosed by brackets [] replaced by + * your own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * If you wish your version of this file to be governed by only the CDDL + * or only the GPL Version 2, indicate your decision by adding + * "[Contributor] elects to include this software in this distribution + * under the [CDDL or GPL Version 2] license." If you do not indicate a + * single choice of license, a recipient has the option to distribute + * your version of this file under either the CDDL, the GPL Version 2 or + * to extend the choice of license to its licensees as provided above. + * However, if you add GPL Version 2 code and therefore, elected the GPL + * Version 2 license, then the option applies only if the new code is + * made subject to such option by the copyright holder. + * + * Contributor(s): + * + * Portions Copyrighted 2010 Sun Microsystems, Inc. + */ +package org.openide.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import junit.framework.Test; +import org.netbeans.junit.NbTestCase; +import org.netbeans.junit.NbTestSuite; +import org.netbeans.junit.RandomlyFails; + +/** + * + * @author Tim Boudreau + */ +public class RequestProcessor180386Test extends NbTestCase { + + public RequestProcessor180386Test(java.lang.String testName) { + super(testName); + } + + public static Test suite() { + Test t = null; + if (t == null) { + t = new NbTestSuite(RequestProcessor180386Test.class); + } + return t; + } + + static { + RequestProcessor.logger().setLevel(Level.ALL); + } + + public void testSubmit() throws Exception { + class C implements Callable { + + volatile boolean hasRun; + + @Override + public String call() throws Exception { + String result = "Hello"; + hasRun = true; + return result; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + assertEquals("Hello", f.get()); + assertTrue(c.hasRun); + + class R implements Runnable { + + volatile boolean hasRun; + + @Override + public void run() { + hasRun = true; + } + } + R r = new R(); + f = RequestProcessor.getDefault().submit(r, "Goodbye"); + assertEquals("Goodbye", f.get()); + assertTrue(r.hasRun); + } + + public void testSomeTasksNotRunIfShutDown() throws Exception { + final Object lock = new Object(); + int count = 10; + final CountDownLatch waitAllLaunched = new CountDownLatch(count); + final CountDownLatch waitOneFinished = new CountDownLatch(1); + RequestProcessor rp = new RequestProcessor("TestRP", count * 2); + class R implements Runnable { + + volatile boolean hasStarted; + volatile boolean hasFinished; + + @Override + public void run() { + hasStarted = true; + waitAllLaunched.countDown(); + synchronized (lock) { + try { + lock.wait(); + if (Thread.interrupted()) { + return; + } + } catch (InterruptedException ex) { + return; + } finally { + waitOneFinished.countDown(); + } + hasFinished = true; + } + } + } + Set> s = new HashSet>(); + Set rs = new HashSet(); + for (int i = 0; i < count; i++) { + String currName = "Runnable " + i; + R r = new R(); + rs.add(r); + s.add(rp.submit(r, currName)); + } + waitAllLaunched.await(); + synchronized (lock) { + //Notify just one thread + lock.notify(); + } + waitOneFinished.await(); + List notRun = rp.shutdownNow(); + synchronized (lock) { + lock.notifyAll(); + } + boolean allFinished = true; + int finishedCount = 0; + for (R r : rs) { + assertTrue(r.hasStarted); + allFinished &= r.hasFinished; + if (r.hasFinished) { + finishedCount++; + } + } + assertFalse("All tasks should not have completed", allFinished); + assertTrue("At least one task shall complete", finishedCount >= 1); + assertFalse(notRun.isEmpty()); + assertTrue(rp.isShutdown()); + //Technically not provable due to "spurious wakeups" + // assertEquals (1, finishedCount); + + try { + RequestProcessor.getDefault().shutdown(); + fail("Should not be able to shutdown() default RP"); + } catch (Exception e) { + } + try { + RequestProcessor.getDefault().shutdownNow(); + fail("Should not be able to shutdownNow() default RP"); + } catch (Exception e) { + } + for (Runnable nr : notRun) { + assertTrue ("Shutdown is not returning submitted runnables - got a " + + nr.getClass().getName() + " instead of " + + R.class.getName(), nr.getClass() == R.class); + } + } + + public void testAwaitTermination() throws Exception { + int count = 20; + final Object lock = new Object(); + final CountDownLatch waitAllLaunched = new CountDownLatch(count); + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + class R implements Runnable { + + volatile boolean hasStarted; + volatile boolean hasFinished; + + @Override + public void run() { + hasStarted = true; + waitAllLaunched.countDown(); + synchronized (lock) { + try { + lock.wait(); + if (Thread.interrupted()) { + return; + } + } catch (InterruptedException ex) { + return; + } finally { + hasFinished = true; + waitAll.countDown(); + } + } + } + } + Set> s = new HashSet>(); + Set rs = new HashSet(); + for (int i = 0; i < count; i++) { + String currName = "Runnable " + i; + R r = new R(); + rs.add(r); + s.add(rp.submit(r, currName)); + } + waitAllLaunched.await(); + synchronized (lock) { + //Notify just one thread + lock.notifyAll(); + } + rp.shutdown(); + boolean awaitTermination = rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + assertTrue(awaitTermination); + assertTrue(rp.isShutdown()); + assertTrue(rp.isTerminated()); + } + + @RandomlyFails + public void testAwaitTerminationWaitsForNewlyAddedThreads() throws Exception { + final RequestProcessor rp = new RequestProcessor("testAwaitTerminationWaitsForNewlyAddedThreads", 50, false); + int count = 30; + final CountDownLatch waitLock = new CountDownLatch(1); + class R implements Runnable { + boolean done; + @Override + public void run() { + try { + waitLock.await(); + } catch (InterruptedException ex) { + done = true; + } finally { + done = true; + } + } + } + Set rs = new HashSet(); + for (int i= 0; i < count; i++) { + R r = new R(); + rs.add(r); + rp.submit(r); + } + final CountDownLatch shutdownBegun = new CountDownLatch(1); + Runnable shutdowner = new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + rp.shutdown(); + shutdownBegun.countDown(); + } catch (InterruptedException ex) { + Exceptions.printStackTrace(ex); + } + } + }; + waitLock.countDown(); + new Thread(shutdowner).start(); + assertTrue(rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + Thread.sleep (600); + assertTrue (rp.isTerminated()); + } + + public void testInvokeAll() throws Exception { + int count = 20; + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + try { + class C implements Callable { + + private final String result; + volatile boolean ran; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + ran = true; + waitAll.countDown(); + return result; + } + } + List callables = new ArrayList(count); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + callables.add(c); + } + fs = rp.invokeAll(callables); + assertNotNull(fs); + assertEquals(0, waitAll.getCount()); + for (Future f : fs) { + assertTrue (f.isDone()); + } + for (C c : callables) { + assertTrue (c.ran); + } + Set s = new HashSet(count); + for (Future f : fs) { + s.add(f.get()); + } + assertEquals(names, s); + } finally { + rp.stop(); + } + } + + public void testInvokeAllWithTimeout() throws Exception { + int count = 20; + final CountDownLatch blocker = new CountDownLatch(1); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + try { + class C implements Callable { + volatile boolean iAmSpecial; + + private final String result; + volatile boolean ran; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + //Only one will be allowed to run, the rest + //will be cancelled + if (!iAmSpecial) { + blocker.await(); + } + ran = true; + return result; + } + } + List callables = new ArrayList(count); + C special = new C("Special"); + special.iAmSpecial = true; + callables.add(special); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + callables.add(c); + } + fs = rp.invokeAll(callables, 1000, TimeUnit.MILLISECONDS); + assertNotNull(fs); + for (Future f : fs) { + assertTrue (f.isDone()); + } + for (C c : callables) { + if (c == special) { + assertTrue (c.ran); + } else { + assertFalse(c.ran); + } + } + } finally { + rp.stop(); + } + } + + public void testInvokeAllSingleThread() throws Exception { + int count = 20; + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", 1); + class C implements Callable { + + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + waitAll.countDown(); + return result; + } + } + List l = new ArrayList(count); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + fs = rp.invokeAll(l); + assertNotNull(fs); + Set s = new HashSet(count); + for (Future f : fs) { + s.add(f.get()); + } + assertEquals(names, s); + } + + public void testInvokeAny() throws Exception { + int count = 20; + final RequestProcessor rp = new RequestProcessor("TestRP", count + 1); + class C implements Callable { + + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + if (Thread.interrupted()) { + return null; + } + return result; + } + } + List l = new ArrayList(count); + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + String res = rp.invokeAny(l); + assertNotNull(res); + assertTrue(res.startsWith("R")); + } + + public void testInvokeAnySingleThread() throws Exception { + int count = 1000; + final RequestProcessor rp = new RequestProcessor("TestRP", 20); + final CountDownLatch latch = new CountDownLatch(count); + class C implements Callable { + + volatile boolean hasRun; + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + latch.countDown(); + if (!"R17".equals(result)) { + try { + //Block all but one thread until threads have entered + latch.await(); + } catch (InterruptedException ie) { + } + } + Thread.yield(); + hasRun = true; + return result; + } + } + List l = new ArrayList(count); + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + String res = rp.invokeAny(l); + assertNotNull(res); + assertTrue(res.startsWith("R")); + int runCount = 0; + for (C c : l) { + if (c.hasRun) { + runCount++; + } + } + assertTrue("Not all " + count + " threads should have completed, but " + runCount + " did.", runCount < count); + } + + public void testInvokeAnyWithTimeout() throws Exception { + int count = 20; + final RequestProcessor rp = new RequestProcessor("TestRP", count + 1); + final CountDownLatch latch = new CountDownLatch(1); + class C implements Callable { + + volatile boolean hasRun; + private final String result; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + latch.await(); + if (Thread.interrupted()) { + return null; + } + hasRun = true; + return result; + } + } + List l = new ArrayList(count); + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + //All threads are waiting on latch; we should time out + String res = rp.invokeAny(l, 400, TimeUnit.MILLISECONDS); + assertNull(res); + for (C c : l) { + assertFalse(c.hasRun); + } + } + + public void testCancellation() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + class C implements Callable { + + volatile boolean hasRun; + volatile boolean interrupted; + + @Override + public String call() throws Exception { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + return null; + } + if (Thread.interrupted()) { + interrupted = true; + return null; + } + hasRun = true; + return "Hello"; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + latch.countDown(); + String s = null; + try { + s = f.get(); + fail("CancellationException should have been thrown"); + } catch (CancellationException e) { + } + assertNull(s); + assertTrue(c.interrupted || !c.hasRun); + assertFalse(c.hasRun); + } + + public void testCancellablesGetCancelInvokedWithCallable() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch exit = new CountDownLatch(1); + class C implements Callable, Cancellable { + + volatile boolean hasRun; + volatile boolean interrupted; + volatile boolean cancelled; + + @Override + public String call() throws Exception { + try { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + return null; + } + if (Thread.interrupted()) { + interrupted = true; + return null; + } + if (cancelled) { + return null; + } + hasRun = true; + return "Hello"; + } finally { + exit.countDown(); + } + } + + @Override + public boolean cancel() { + cancelled = true; + exit.countDown(); + return true; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + assertTrue (c.cancelled); + latch.countDown(); + exit.await(); + String s = null; + try { + s = f.get(); + fail ("Should have gotten cancellation exception"); + } catch (CancellationException e) { + + } + } + + public void testCancellablesGetCancelInvokedWithRunnable() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch exit = new CountDownLatch(1); + class C implements Runnable, Cancellable { + + volatile boolean hasRun; + volatile boolean interrupted; + volatile boolean cancelled; + + @Override + public void run() { + try { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + return; + } + if (Thread.interrupted()) { + interrupted = true; + return; + } + if (cancelled) { + return; + } + hasRun = true; + } finally { + exit.countDown(); + } + } + + @Override + public boolean cancel() { + cancelled = true; + exit.countDown(); + return true; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + assertTrue (c.cancelled); + latch.countDown(); + exit.await(); + try { + f.get(); + fail ("Should have gotten cancellation exception"); + } catch (CancellationException e) { + + } + assertFalse (c.hasRun); + } + + public void testCancellablesThatSayTheyCantBeCancelledAreNotCancelledViaFutureDotCancel() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch exit = new CountDownLatch(1); + class C implements Runnable, Cancellable { + + volatile boolean hasRun; + volatile boolean interrupted; + volatile boolean cancelCalled; + + @Override + public void run() { + try { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted = true; + throw new AssertionError(e); + } + if (Thread.interrupted()) { + interrupted = true; + throw new AssertionError("Thread should not have been interrupted"); + } + hasRun = true; + } finally { + exit.countDown(); + } + } + + @Override + public boolean cancel() { + cancelCalled = true; + return false; + } + } + C c = new C(); + Future f = RequestProcessor.getDefault().submit(c); + f.cancel(true); + assertFalse (f.isCancelled()); + assertTrue (c.cancelCalled); + latch.countDown(); + exit.await(); + f.get(); + assertFalse (f.isCancelled()); + assertTrue (c.hasRun); + } + + public void testInvokeAllCancellation() throws Exception { + int count = 20; + final CountDownLatch waitAll = new CountDownLatch(count); + final RequestProcessor rp = new RequestProcessor("TestRP", count); + class C implements Callable, Cancellable { + + private final String result; + volatile boolean cancelCalled; + + C(String result) { + this.result = result; + } + + @Override + public String call() throws Exception { + waitAll.countDown(); + return cancelCalled ? null : result; + } + + @Override + public boolean cancel() { + cancelCalled = true; + return false; + } + } + List l = new ArrayList(count); + List> fs; + Set names = new HashSet(count); + for (int i = 0; i < count; i++) { + String name = "R" + i; + names.add(name); + C c = new C(name); + l.add(c); + } + fs = rp.invokeAll(l); + assertNotNull(fs); + Set s = new HashSet(count); + for (Future f : fs) { + s.add(f.get()); + } + assertEquals(names, s); + } + + public void testCannotScheduleLongerThanIntegerMaxValue() throws Exception { + Runnable r = new Runnable() { + + @Override + public void run() { + fail ("Should not have been run"); + } + }; + try { + Future f = RequestProcessor.getDefault().schedule(r, Long.MAX_VALUE, TimeUnit.DAYS); + f.cancel(true); + } catch (Exception e) {} + } + + public void testCannotScheduleNegativeDelay() throws Exception { + Runnable r = new Runnable() { + + @Override + public void run() { + fail ("Should not have been run"); + } + }; + try { + RequestProcessor.getDefault().schedule(r, -1L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleAtFixedRate(r, -1L, 22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleAtFixedRate(r, 200, -22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleWithFixedDelay(r, -1L, 22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + try { + RequestProcessor.getDefault().scheduleWithFixedDelay(r, 1L, -22L, TimeUnit.MILLISECONDS); + fail ("Negative value accepetd"); + } catch (Exception e) {} + } + + public void testTaskCanRescheduleItself() throws Exception { + final CountDownLatch latch = new CountDownLatch(2); + class R implements Runnable { + volatile RequestProcessor.Task task; + volatile int runCount; + @Override + public void run() { + runCount++; + if (runCount == 1) { + task.schedule(0); + } + latch.countDown(); + } + } + R r = new R(); + RequestProcessor.Task t = RequestProcessor.getDefault().create(r); + r.task = t; + t.schedule(0); + latch.await (); + assertEquals (r.runCount, 2); + } + + public void testScheduleRepeatingSanityFixedRate() throws Exception { + final CountDownLatch latch = new CountDownLatch(5); + class C implements Runnable { + volatile int runCount; + @Override + public void run() { + runCount++; + latch.countDown(); + } + } + C c = new C(); + RequestProcessor.getDefault().scheduleWithFixedDelay(c, 0, 20, TimeUnit.MILLISECONDS); +// latch.await(5000, TimeUnit.MILLISECONDS); + latch.await(); + assertEquals (5, c.runCount); + } + + public void testScheduleRepeatingSanityFixedDelay() throws Exception { + final CountDownLatch latch = new CountDownLatch(5); + class C implements Runnable { + volatile int runCount; + @Override + public void run() { + runCount++; + latch.countDown(); + } + } + C c = new C(); + RequestProcessor.getDefault().scheduleAtFixedRate(c, 0, 20, TimeUnit.MILLISECONDS); + latch.await(2000, TimeUnit.MILLISECONDS); + + assertEquals (5, c.runCount); + } + + public void testScheduleOneShot() throws Exception { + RequestProcessor rp = new RequestProcessor ("testScheduleOneShot", 5, true, true); + try { + class C implements Callable { + volatile long start = System.currentTimeMillis(); + private volatile long end; + + @Override + public String call() throws Exception { + synchronized(this) { + end = System.currentTimeMillis(); + } + return "Hello"; + } + + synchronized long elapsed() { + return end - start; + } + } + C c = new C(); + int delay = 5000; + //Use a 20 second timeout to have a reasonable chance of accuracy + ScheduledFuture f = rp.schedule(c, delay, TimeUnit.MILLISECONDS); + assertEquals (5000, f.getDelay(TimeUnit.MILLISECONDS)); + assertNotNull(f.get()); + //Allow 4 seconds fudge-factor + assertTrue (c.elapsed() > 4600); + assertTrue (f.isDone()); + } finally { + rp.stop(); + } + } + + public void testScheduleRepeatingIntervalsAreRoughlyCorrect() throws Exception { + int runCount = 5; + final CountDownLatch latch = new CountDownLatch(runCount); + final List intervals = Collections.synchronizedList(new ArrayList (runCount)); +// long initialDelay = 30000; +// long period = 20000; +// long fudgeFactor = 4000; + long initialDelay = 3000; + long period = 2000; + long fudgeFactor = 400; + long expectedInitialDelay = initialDelay - fudgeFactor; + long expectedPeriod = period - fudgeFactor; + class C implements Runnable { + volatile long start = System.currentTimeMillis(); + private int runCount; + @Override + public void run() { + runCount++; + try { + synchronized(this) { + long end = System.currentTimeMillis(); + intervals.add (end - start); + start = end; + } + } finally { + latch.countDown(); + } + } + } + C c = new C(); + RequestProcessor rp = new RequestProcessor ("testScheduleRepeating", 5, true); + try { + Future f = rp.scheduleWithFixedDelay(c, initialDelay, period, TimeUnit.MILLISECONDS); + // latch.await(initialDelay + fudgeFactor + ((runCount - 1) * (period + fudgeFactor)), TimeUnit.MILLISECONDS); //XXX + latch.await(); + f.cancel(true); + for (int i= 0; i < Math.min(runCount, intervals.size()); i++) { + long expect = i == 0 ? expectedInitialDelay : expectedPeriod; + assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect); + } + //Ensure we have really exited + try { + f.get(); + fail ("CancellationException should have been thrown"); + } catch (CancellationException e) {} + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + } finally { + rp.stop(); + } + } + + public void testScheduleFixedRateAreRoughlyCorrect() throws Exception { + int runCount = 5; + final CountDownLatch latch = new CountDownLatch(runCount); + final List intervals = Collections.synchronizedList(new ArrayList (runCount)); +// long initialDelay = 30000; +// long period = 20000; +// long fudgeFactor = 4000; + long initialDelay = 3000; + long period = 2000; + long fudgeFactor = 400; + long expectedInitialDelay = initialDelay - fudgeFactor; + long expectedPeriod = period - fudgeFactor; + class C implements Runnable { + volatile long start = System.currentTimeMillis(); + private int runCount; + @Override + public void run() { + runCount++; + try { + synchronized(this) { + long end = System.currentTimeMillis(); + intervals.add (end - start); + start = end; + } + } finally { + latch.countDown(); + } + } + } + C c = new C(); + RequestProcessor rp = new RequestProcessor ("testScheduleFixedRateAreRoughlyCorrect", 5, true); + try { + Future f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); + latch.await(); + f.cancel(true); + for (int i= 0; i < Math.min(runCount, intervals.size()); i++) { + long expect = i == 0 ? expectedInitialDelay : expectedPeriod; + assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect); + } + //Ensure we have really exited + try { + f.get(); + fail ("CancellationException should have been thrown"); + } catch (CancellationException e) {} + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + } finally { + rp.stop(); + } + } + + public void testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution() throws Exception { + final AtomicInteger val = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(10); + class C implements Runnable { + boolean failed; + @Override + public void run() { + try { + int now = val.incrementAndGet(); + if (now > 1) { + failed = true; + fail (now + " threads simultaneously in run()"); + } + try { + //Intentionally sleep *longer* than the interval + //between executions. We *want* to pile up all of the + //RP threads entering run() - synchronization should + //serialize them. This test is to prove that this + //method will never be called concurrently from two threads + Thread.sleep(1000); + } catch (InterruptedException ex) { + + } + } finally { + val.decrementAndGet(); + latch.countDown(); + } + } + } + C c = new C(); + long initialDelay = 2000; + long period = 10; + RequestProcessor rp = new RequestProcessor("testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution", 10, true); + rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); + latch.await(); + assertFalse(c.failed); + rp.stop(); + } + + @RandomlyFails + public void testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed() throws Exception { + final CountDownLatch latch = new CountDownLatch(10); + final List intervals = new CopyOnWriteArrayList(); + class C implements Runnable { + long start = Long.MIN_VALUE; + + @Override + public void run() { + long end = System.currentTimeMillis(); + if (start != Long.MIN_VALUE) { + intervals.add(end - start); + } + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + + } + start = System.currentTimeMillis(); + latch.countDown(); + } + } + C c = new C(); + long initialDelay = 100; + long period = 100; + RequestProcessor rp = new RequestProcessor("testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed", 10, true); + ScheduledFuture f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); + latch.await(); + f.cancel(true); + rp.stop(); + int max = intervals.size(); + for (int i= 0; i < max; i++) { + long iv = intervals.get(i); + assertFalse ("Interval " + i + " should have been at least less than requested interval * 1.5 with fixed rate" + iv, iv > 150); + } + } + + public void testCancelFutureInterruptsThreadEvenIfRequestProcessorForbidsIt() throws Exception { + RequestProcessor rp = new RequestProcessor ("X", 3, false, true); + final CountDownLatch releaseForRun = new CountDownLatch(1); + final CountDownLatch enterLatch = new CountDownLatch(1); + final CountDownLatch exitLatch = new CountDownLatch(1); + class R implements Runnable { + volatile boolean interrupted; + @Override + public void run() { + enterLatch.countDown(); + try { + releaseForRun.await(); + } catch (InterruptedException ex) { + interrupted = true; + } + interrupted |= Thread.interrupted(); + exitLatch.countDown(); + } + } + R r = new R(); + Future f = rp.submit(r); + enterLatch.await(); + f.cancel(true); + assertTrue (f.isCancelled()); + exitLatch.await(); + assertTrue (r.interrupted); + } + + public void testCancelDoesNotInterruptIfNotPassedToFutureDotCancel() throws Exception { + RequestProcessor rp = new RequestProcessor ("X", 3, false, true); + final CountDownLatch releaseForRun = new CountDownLatch(1); + final CountDownLatch enterLatch = new CountDownLatch(1); + final CountDownLatch exitLatch = new CountDownLatch(1); + class R implements Runnable { + volatile boolean interrupted; + @Override + public void run() { + enterLatch.countDown(); + try { + releaseForRun.await(); + } catch (InterruptedException ex) { + interrupted = true; + } + interrupted |= Thread.interrupted(); + exitLatch.countDown(); + } + } + R r = new R(); + Future f = rp.submit(r); + enterLatch.await(); + f.cancel(false); + assertTrue (f.isCancelled()); + assertFalse (r.interrupted); + } + + public void testCancelDoesInterruptIfRequestProcessorSpecifiesItEvenIfFalsePassedToFutureDotCancel() throws Exception { + RequestProcessor rp = new RequestProcessor ("X", 3, true, true); + final CountDownLatch releaseForRun = new CountDownLatch(1); + final CountDownLatch enterLatch = new CountDownLatch(1); + final CountDownLatch exitLatch = new CountDownLatch(1); + class R implements Runnable { + volatile boolean interrupted; + @Override + public void run() { + enterLatch.countDown(); + try { + releaseForRun.await(); + } catch (InterruptedException ex) { + interrupted = true; + } + interrupted |= Thread.interrupted(); + exitLatch.countDown(); + } + } + R r = new R(); + Future f = rp.submit(r); + enterLatch.await(); + f.cancel(false); + assertTrue (f.isCancelled()); + exitLatch.await(); + assertTrue (r.interrupted); + } +}