This Bugzilla instance is a read-only archive of historic NetBeans bug reports. To report a bug in NetBeans please follow the project's instructions for reporting issues.

View | Details | Raw Unified | Return to bug 180386
Collapse All | Expand All

(-)a/openide.util/manifest.mf (-1 / +1 lines)
Lines 1-5 Link Here
1
Manifest-Version: 1.0
1
Manifest-Version: 1.0
2
OpenIDE-Module: org.openide.util
2
OpenIDE-Module: org.openide.util
3
OpenIDE-Module-Localizing-Bundle: org/openide/util/Bundle.properties
3
OpenIDE-Module-Localizing-Bundle: org/openide/util/Bundle.properties
4
OpenIDE-Module-Specification-Version: 8.1
4
OpenIDE-Module-Specification-Version: 8.2
5
5
(-)a/openide.util/src/org/openide/util/RequestProcessor.java (-2 / +864 lines)
Lines 41-55 Link Here
41
41
42
package org.openide.util;
42
package org.openide.util;
43
43
44
import java.util.ArrayList;
44
import java.util.Arrays;
45
import java.util.Arrays;
46
import java.util.Collection;
45
import java.util.HashSet;
47
import java.util.HashSet;
46
import java.util.LinkedList;
48
import java.util.LinkedList;
47
import java.util.List;
49
import java.util.List;
48
import java.util.ListIterator;
50
import java.util.ListIterator;
51
import java.util.Set;
49
import java.util.Stack;
52
import java.util.Stack;
50
import java.util.Timer;
53
import java.util.Timer;
51
import java.util.TimerTask;
54
import java.util.TimerTask;
55
import java.util.concurrent.Callable;
56
import java.util.concurrent.CancellationException;
57
import java.util.concurrent.CountDownLatch;
58
import java.util.concurrent.Delayed;
59
import java.util.concurrent.ExecutionException;
52
import java.util.concurrent.Executor;
60
import java.util.concurrent.Executor;
61
import java.util.concurrent.Future;
62
import java.util.concurrent.FutureTask;
63
import java.util.concurrent.RejectedExecutionException;
64
import java.util.concurrent.ScheduledExecutorService;
65
import java.util.concurrent.ScheduledFuture;
66
import java.util.concurrent.TimeUnit;
67
import java.util.concurrent.TimeoutException;
68
import java.util.concurrent.atomic.AtomicBoolean;
69
import java.util.concurrent.atomic.AtomicLong;
70
import java.util.concurrent.atomic.AtomicReference;
53
import java.util.logging.Level;
71
import java.util.logging.Level;
54
import java.util.logging.Logger;
72
import java.util.logging.Logger;
55
73
Lines 157-163 Link Here
157
 * 
175
 * 
158
 * @author Petr Nejedly, Jaroslav Tulach
176
 * @author Petr Nejedly, Jaroslav Tulach
159
 */
177
 */
160
public final class RequestProcessor implements Executor {
178
public final class RequestProcessor implements Executor, ScheduledExecutorService {
161
    /** the static instance for users that do not want to have own processor */
179
    /** the static instance for users that do not want to have own processor */
162
    private static RequestProcessor DEFAULT = new RequestProcessor();
180
    private static RequestProcessor DEFAULT = new RequestProcessor();
163
181
Lines 186-192 Link Here
186
204
187
    /** If the RP was stopped, this variable will be set, every new post()
205
    /** If the RP was stopped, this variable will be set, every new post()
188
     * will throw an exception and no task will be processed any further */
206
     * will throw an exception and no task will be processed any further */
189
    boolean stopped = false;
207
    volatile boolean stopped = false;
190
208
191
    /** The lock covering following four fields. They should be accessed
209
    /** The lock covering following four fields. They should be accessed
192
     * only while having this lock held. */
210
     * only while having this lock held. */
Lines 585-590 Link Here
585
        }
603
        }
586
    }
604
    }
587
605
606
    /**
607
     * Shut down this thread pool as defined in
608
     * {@link java.util.concurrent.ExecutorService#shutdown()}.<p/>If this method
609
     * is called on the <a href="#getDefault()">default RequestProcessor</a>,
610
     * throws an IllegalStateException.
611
     * @since 8.2
612
     */
613
    @Override
614
    public void shutdown() {
615
        if (this == UNLIMITED) {
616
            throw new IllegalStateException ("Cannot shut down the default " + //NOI18N
617
                    "request processor"); //NOI18N
618
        }
619
        stop();
620
    }
621
622
    /**
623
     * Shut down this thread pool as defined in
624
     * {@link java.util.concurrent.ExecutorService#shutdownNow()}, returning
625
     * a list of un-run or unfinished runnables.  <p/> If this method
626
     * is called on the <a href="#getDefault()">default RequestProcessor</a>,
627
     * throws an IllegalStateException.
628
     * @since 8.2
629
     */
630
    @Override
631
    public List<Runnable> shutdownNow() {
632
        if (this == UNLIMITED) {
633
            throw new IllegalStateException ("Cannot shut down the default " + //NOI18N
634
                    "request processor"); //NOI18N
635
        }
636
        //XXX more aggressive shutdown?
637
        stop();
638
        synchronized (processorLock) {
639
            List<Runnable> result = new ArrayList<Runnable>(processors.size());
640
            for (Processor p : processors) {
641
                if (p != null && p.todo != null && p.todo.run != null) {
642
                    Runnable r = p.todo.run;
643
                    if (r instanceof RunnableWrapper) {
644
                        Runnable other = ((RunnableWrapper) r).getRunnable();
645
                        r = other == null ? r : other;
646
                    }
647
                    result.add(r);
648
                }
649
            }
650
            return result;
651
        }
652
    }
653
654
    /**
655
     * Determine if this thread pool has been shut down, in accordance with
656
     * the specification of {@link java.util.concurrent.ExecutorService#shutdown()}
657
     * @return
658
     * @since 8.2
659
     */
660
    @Override
661
    public boolean isShutdown() {
662
        return stopped;
663
    }
664
665
    /**
666
     * Determine if this thread pool has been terminated, in accordance with
667
     * the specification of {@link java.util.concurrent.ExecutorService#isTerminated()}
668
     * @return
669
     * @since 8.2
670
     */
671
    @Override
672
    public boolean isTerminated() {
673
        boolean result = true;
674
        Set<Processor> set = collectProcessors(new HashSet<Processor>());
675
        for (Processor p : set) {
676
            if (p.isAlive() && p.belongsTo(this)) {
677
                result = false;
678
                break;
679
            }
680
        }
681
        return result;
682
    }
683
684
    /**
685
     * Determine if this thread pool has been shut down, in accordance with
686
     * the specification of {@link java.util.concurrent.ExecutorService#shutdown()}
687
     * @return True if the request processor is shut down before the timeout
688
     * @since 8.2
689
     */
690
    @Override
691
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
692
        Parameters.notNull("unit", unit); //NOI18N
693
        long timeoutMillis = unit.convert(timeout, TimeUnit.MILLISECONDS);
694
        boolean result = stopped;
695
        long doneTime = System.currentTimeMillis() + timeoutMillis;
696
        Set<Processor> procs = new HashSet<Processor>();
697
outer:  do {
698
            procs = collectProcessors(procs);
699
            if (procs.isEmpty()) {
700
                return true;
701
            }
702
            for (Processor p : procs) {
703
                long remaining = doneTime - System.currentTimeMillis();
704
                if (remaining <= 0) {
705
                    result = collectProcessors(procs).isEmpty();
706
                    break outer;
707
                }
708
                if (p.belongsTo(this)) {
709
                    p.join(remaining);
710
                }
711
                result = !p.isAlive() || !p.belongsTo(this);
712
            }
713
            procs.clear();
714
        } while (!procs.isEmpty());
715
        return result;
716
    }
717
718
    private Set<Processor> collectProcessors (Set<Processor> procs) {
719
        procs.clear();
720
        synchronized (processorLock) {
721
            for (Processor p : processors) {
722
                if (p.belongsTo(this)) {
723
                    procs.add(p);
724
                }
725
            }
726
        }
727
        return procs;
728
    }
729
730
    /**
731
     * Submit a job to be run, and get back a {@link java.util.concurrent.Future}
732
     * which can be waited on and asked for a result, following the contract of
733
     * {@link java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable)}
734
     * <p/>
735
     * <b>Note:</b> If the passed {@link java.util.concurrent.Callable} implements
736
     * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()}
737
     * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked.
738
     * If <code>Cancellable.cancel()</code> returns false, then <i>the job will <u>not</u> be
739
     * cancelled</i>.
740
     *
741
     * @param <T> The return type of the passed job
742
     * @param task The work to do
743
     * @return A Future that can indicate work status and fetch a result
744
     * @throws RejectedExecutionException if this RequestProcessor has been
745
     * shut down
746
     * @since 8.2
747
     */
748
    @Override
749
    public <T> Future<T> submit(Callable<T> task) {
750
        Parameters.notNull("task", task); //NOI18N
751
        if (stopped) {
752
            throw new RejectedExecutionException("Request Processor already " + //NOI18N
753
                    "stopped"); //NOI18N
754
        }
755
        RPFutureTask<T> result = new RPFutureTask<T>(task);
756
        Task t = create(result);
757
        result.setTask(t);
758
        t.schedule(0);
759
        return result;
760
    }
761
    /**
762
     * Submit a job to be run, and get back a {@link java.util.concurrent.Future}
763
     * which can be waited on and asked for a result, following the contract of
764
     * {@link java.util.concurrent.ExecutorService#submit(java.lang.Runnable, T)}
765
     * <p/>
766
     * <b>Note:</b> If the passed {@link java.lang.Runnable} implements
767
     * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()}
768
     * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked.
769
     * If <code>Cancellable.cancel()</code> returns false, then <i>the job will <u>not</u> be
770
     * cancelled</i>.
771
     *
772
     * @param <T> The return type of the passed job
773
     * @param task The work to do
774
     * @param predefinedResult The return value for the resulting Future's
775
     * <code>get()</code> method
776
     * @return A Future that can indicate work status and fetch the result
777
     * @throws RejectedExecutionException if this RequestProcessor has been
778
     * shut down
779
     * @since 8.2
780
     */
781
    @Override
782
    public <T> Future<T> submit(Runnable task, T predefinedResult) {
783
        Parameters.notNull("task", task); //NOI18N
784
        if (stopped) {
785
            throw new RejectedExecutionException("Request Processor already " + //NOI18N
786
                    "stopped"); //NOI18N
787
        }
788
        RPFutureTask<T> result = new RPFutureTask<T>(task, predefinedResult);
789
        Task t = create(result);
790
        result.setTask(t);
791
        t.schedule(0);
792
        return result;
793
    }
794
795
    /**
796
     * Submit a job to be run, and get back a {@link java.util.concurrent.Future}
797
     * which can be waited on and asked for a result, following the contract of
798
     * {@link java.util.concurrent.ExecutorService#submit(java.lang.Runnable)}
799
     * <p/>
800
     * <b>Note:</b> If the passed {@link java.lang.Runnable} implements
801
     * {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()}
802
     * method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked.
803
     * If <code>Cancellable.cancel()</code> returns false, then <i>the job will <u>not</u> be
804
     * cancelled</i>.
805
     *
806
     * @param <T> The return type of the passed job
807
     * @param task The work to do
808
     * @return A Future that can indicate work status and fetch a result
809
     * @throws RejectedExecutionException if this RequestProcessor has been
810
     * shut down
811
     * @since 8.2
812
     */
813
    @Override
814
    public Future<?> submit(Runnable task) {
815
        return this.<Void>submit (task, null);
816
    }
817
818
    /**
819
     * Invoke a collection of tasks, as defined in the specification of
820
     * {@link java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)}
821
     * @param <T> The type of returned object
822
     * @param tasks A collection of Callables with the same return type
823
     * @return A list of futures which can be monitored for completion
824
     * @throws InterruptedException If the timeout expires
825
     * @since 8.2
826
     */
827
    @Override
828
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
829
        Parameters.notNull("tasks", tasks); //NOI18N
830
        List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
831
        CountDownLatch wait = new CountDownLatch(tasks.size());
832
        for (Callable<T> c : tasks) {
833
            if (c == null) {
834
                    throw new NullPointerException ("Contains null tasks: " +  //NOI18N
835
                            tasks);
836
            }
837
            Callable<T> delegate = new WaitableCallable<T>(c, wait);
838
            result.add (submit(delegate));
839
        }
840
        wait.await();
841
        return result;
842
    }
843
844
    /**
845
     * Executes the given tasks, returning a list of Futures holding their
846
     * status and results when all complete or the timeout expires, whichever
847
     * happens first, as specified in
848
     * {@link java.util.concurrent.ExecutorService#invokeAll((java.util.Collection,long,java.util.concurrent.TimeUnit))}
849
     * @param <T> The result type
850
     * @param tasks A collection of callables
851
     * @param timeout The maximum time to wait for completion, in the specified time units
852
     * @param unit The time unit
853
     * @return A list of futures
854
     * @throws InterruptedException if the timeout expires
855
     * @since 8.2
856
     */
857
    @Override
858
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
859
        Parameters.notNull("unit", unit); //NOI18N
860
        Parameters.notNull("tasks", tasks); //NOI18N
861
        CountDownLatch wait = new CountDownLatch(tasks.size());
862
        List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
863
        for (Callable<T> c : tasks) {
864
            if (c == null) {
865
                throw new NullPointerException ("Contains null tasks: " + tasks); //NOI18N
866
            }
867
            Callable<T> delegate = new WaitableCallable<T>(c, wait);
868
            result.add (submit(delegate));
869
        }
870
        if (!wait.await(timeout, unit)) {
871
            for (Future<T> f : result) {
872
                RPFutureTask<?> ft = (RPFutureTask<?>) f;
873
                ft.cancel(true);
874
            }
875
        }
876
        return result;
877
    }
878
    /**
879
     * Executes the given tasks, returning the result of one which has
880
     * completed and cancelling any incomplete tasks, as specified in
881
     * {@link java.util.concurrent.ExecutorService#invokeAny((java.util.Collection))}
882
     * @param <T> The result type
883
     * @param tasks A collection of callables
884
     * @return A list of futures
885
     * @throws InterruptedException if execution is interrupted
886
     * @since 8.2
887
     */
888
    @Override
889
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
890
        Parameters.notNull("tasks", tasks); //NOI18N
891
        CountDownLatch wait = new CountDownLatch(1);
892
        List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
893
        AtomicReference<T> ref = new AtomicReference<T>();
894
        try {
895
            for (Callable<T> c : tasks) {
896
                if (c == null) {
897
                    throw new NullPointerException ("Contains null tasks: " +  //NOI18N
898
                            tasks);
899
                }
900
                Callable<T> delegate = new WaitableCallable<T>(c, ref, wait);
901
                result.add (submit(delegate));
902
            }
903
            wait.await();
904
        } finally {
905
            for (Future<T> f : result) {
906
                RPFutureTask<?> ft = (RPFutureTask<?>) f;
907
                ft.cancel(true);
908
            }
909
        }
910
        return ref.get();
911
    }
912
    /**
913
     * Executes the given tasks, returning a list of Futures holding their
914
     * status and results when all complete or the timeout expires, whichever
915
     * happens first, as specified in
916
     * {@link java.util.concurrent.ExecutorService#invokeAny((java.util.Collection))}
917
     * @param <T> The result type
918
     * @param tasks A collection of callables
919
     * @param timeout The maximum time to wait for completion, in the specified time units
920
     * @param unit The time unit
921
     * @return A list of futures
922
     * @throws InterruptedException if the timeout expires or execution is interrupted
923
     * @since 8.2
924
     */
925
    @Override
926
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
927
        Parameters.notNull("unit", unit); //NOI18N
928
        Parameters.notNull("tasks", tasks); //NOI18N
929
        CountDownLatch wait = new CountDownLatch(1);
930
        List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
931
        AtomicReference<T> ref = new AtomicReference<T>();
932
        try {
933
            for (Callable<T> c : tasks) {
934
                if (c == null) {
935
                    throw new NullPointerException ("Contains null tasks: " +  //NOI18N
936
                            tasks);
937
                }
938
                Callable<T> delegate = new WaitableCallable(c, ref, wait);
939
                result.add (submit(delegate));
940
            }
941
            wait.await(timeout, unit);
942
        } finally {
943
            for (Future<T> f : result) {
944
                RPFutureTask ft = (RPFutureTask) f;
945
                ft.cancel(true);
946
            }
947
        }
948
        return ref.get();
949
    }
950
951
    /**
952
     * Schedule a runnable to execute after the specified delay.
953
     * @param command The runnable
954
     * @param delay The delay
955
     * @param unit The time units of the delay
956
     * @return A future which can be monitored for completion
957
     * @since 8.2
958
     */
959
    @Override
960
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
961
        Parameters.notNull("command", command); //NOI18N
962
        Parameters.notNull("unit", unit); //NOI18N
963
        if (delay < 0) {
964
            throw new IllegalArgumentException ("Negative delay: " + delay);
965
        }
966
        if (stopped) {
967
            throw new RejectedExecutionException("Request Processor already stopped"); //NOI18N
968
        }
969
        long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS);
970
        if (delayMillis > Integer.MAX_VALUE) {
971
            throw new IllegalArgumentException ("Requested delay " + delayMillis +  //NOI18N
972
                    " is > Integer.MAX_VALUE"); //NOI18N
973
        }
974
        ScheduledRPFutureTask<Void> result = new ScheduledRPFutureTask<Void>(command, null, delayMillis);
975
        Task t = create(result);
976
        result.setTask(t);
977
        t.schedule((int) delayMillis);
978
        return result;
979
    }
980
    /**
981
     * Schedule a {@link java.util.concurrent.Callable} to execute on another thread
982
     * after the specified delay.
983
     * @param command The work to run
984
     * @param delay The delay
985
     * @param unit The time units of the delay
986
     * @return A future which can be monitored for completion
987
     * @since 8.2
988
     */
989
    @Override
990
    public <T> ScheduledFuture<T> schedule(Callable<T> callable, long delay, TimeUnit unit) {
991
        Parameters.notNull("unit", unit); //NOI18N
992
        Parameters.notNull("callable", callable); //NOI18N
993
        if (delay < 0) {
994
            throw new IllegalArgumentException ("Negative delay: " + delay);
995
        }
996
        if (stopped) {
997
            throw new RejectedExecutionException("Request Processor already " + //NOI18N
998
                    "stopped"); //NOI18N
999
        }
1000
        long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS);
1001
        if (delayMillis > Integer.MAX_VALUE) {
1002
            throw new IllegalArgumentException ("Requested delay " +  //NOI18N
1003
                    delayMillis + " is > Integer.MAX_VALUE"); //NOI18N
1004
        }
1005
        ScheduledRPFutureTask<T> result = new ScheduledRPFutureTask<T>(callable, delayMillis);
1006
        Task t = create(result);
1007
        result.setTask(t);
1008
        t.schedule((int) delayMillis);
1009
        return result;
1010
    }
1011
1012
    /**
1013
     * Schedule a runnable which will run with a given frequency, regardless
1014
     * of how long execution takes, with the exception that if execution takes
1015
     * longer than the specified delay, execution will be delayed but will
1016
     * never be run on two threads concurrently.
1017
     * As specified in
1018
     * {@link java.util.concurrent.ExecutorService.scheduleAtFixedRate(java.lang.Runnable,long,long,java.util.concurrent.TimeUnit)}
1019
     * @param command The runnable
1020
     * @param initialDelay The delay before first running
1021
     * @param period The frequency with which the runnable should run after the
1022
     * first run
1023
     * @param unit The time units in which the initial delay and period are
1024
     * specified
1025
     * @return A future which can be monitored for completion, or cancelled
1026
     * @since 8.2
1027
     */
1028
    @Override
1029
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
1030
        return scheduleFixed(command, initialDelay, period, unit, false);
1031
    }
1032
1033
    /**
1034
     * Schedule a runnable which will run repeatedly after the specified initial
1035
     * delay, with the specified delay between the completion of one run and
1036
     * the start of the next.
1037
     * As specified in
1038
     * {@link java.util.concurrent.ExecutorService.scheduleWithFixedDelay(java.lang.Runnable,long,long,java.util.concurrent.TimeUnit)}
1039
     * @param command The runnable
1040
     * @param initialDelay The delay before first run
1041
     * @param delay The delay between runs
1042
     * @param unit The time units of the delay parameters
1043
     * @return A future which can be monitored for completion, or cancelled
1044
     * @since 8.2
1045
     */
1046
    @Override
1047
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
1048
        return scheduleFixed(command, initialDelay, delay, unit, true);
1049
    }
1050
1051
    private ScheduledFuture<?> scheduleFixed (Runnable command, long initialDelay, long period, TimeUnit unit, boolean fixedDelay) {
1052
        Parameters.notNull("unit", unit); //NOI18N
1053
        Parameters.notNull("command", command); //NOI18N
1054
        if (period < 0) {
1055
            throw new IllegalArgumentException ("Negative delay: " + period); //NOI18N
1056
        }
1057
        if (initialDelay < 0) {
1058
            throw new IllegalArgumentException ("Negative initialDelay: "  //NOI18N
1059
                    + initialDelay);
1060
        }
1061
        if (stopped) {
1062
            throw new RejectedExecutionException("Request Processor already " + //NOI18N
1063
                    "stopped"); //NOI18N
1064
        }
1065
        long initialDelayMillis = unit.convert(initialDelay, TimeUnit.MILLISECONDS);
1066
        if (initialDelayMillis > Integer.MAX_VALUE) {
1067
            throw new IllegalArgumentException("Initial delay > " + //NOI18N
1068
                    "Integer.MAX_VALUE milliseconds: " + initialDelayMillis); //NOI18N
1069
        }
1070
        long periodMillis = unit.convert(period, TimeUnit.MILLISECONDS);
1071
        if (periodMillis > Integer.MAX_VALUE) {
1072
            throw new IllegalArgumentException("Initial delay > " + //NOI18N
1073
                    "Integer.MAX_VALUE milliseconds: " + periodMillis); //NOI18N
1074
        }
1075
1076
        TaskFutureWrapper wrap = fixedDelay ? 
1077
            new FixedDelayTask(command, initialDelayMillis, periodMillis) :
1078
            new FixedRateTask(command, initialDelay, periodMillis);
1079
        Task t = create(wrap);
1080
        wrap.t = t;
1081
        t.cancelled = wrap.cancelled;
1082
        t.schedule ((int) initialDelayMillis);
1083
1084
        return wrap;
1085
    }
1086
1087
    private static abstract class TaskFutureWrapper implements ScheduledFuture<Void>, Runnable, RunnableWrapper {
1088
        volatile Task t;
1089
        protected final Runnable toRun;
1090
        protected final long initialDelay;
1091
        protected final long period;
1092
        final AtomicBoolean cancelled = new AtomicBoolean();
1093
        TaskFutureWrapper(Runnable run, long initialDelay, long period) {
1094
            this.toRun = run;
1095
            this.initialDelay = initialDelay;
1096
            this.period = period;
1097
        }
1098
1099
        @Override
1100
        public final Runnable getRunnable() {
1101
            return toRun;
1102
        }
1103
1104
        @Override
1105
        public int compareTo(Delayed o) {
1106
            long other = o.getDelay(TimeUnit.MILLISECONDS);
1107
            long ours = getDelay(TimeUnit.MILLISECONDS);
1108
            //Might overflow on, say, ms compared to Long.MAX_VALUE, TimeUnit.DAYS
1109
            return (int) (ours - other);
1110
        }
1111
1112
        @Override
1113
        public boolean cancel(boolean mayInterruptIfRunning) {
1114
            boolean result = true;
1115
            if (toRun instanceof Cancellable) {
1116
                result = ((Cancellable) toRun).cancel();
1117
            }
1118
            if (result) {
1119
                //will invoke cancelled.set(true)
1120
                result = t.cancel(mayInterruptIfRunning);
1121
            }
1122
            return result;
1123
        }
1124
1125
        @Override
1126
        public boolean isCancelled() {
1127
            return cancelled.get();
1128
        }
1129
1130
        @Override
1131
        public boolean isDone() {
1132
            return cancelled.get() || t.isFinished();
1133
        }
1134
1135
        @Override
1136
        public Void get() throws InterruptedException, ExecutionException {
1137
            if (cancelled.get()) {
1138
                throw new CancellationException();
1139
            }
1140
            t.waitFinished();
1141
            if (cancelled.get()) {
1142
                throw new CancellationException();
1143
            }
1144
            return null;
1145
        }
1146
1147
        @Override
1148
        public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1149
            if (cancelled.get()) {
1150
                throw new CancellationException();
1151
            }
1152
            long millis = unit.convert(timeout, TimeUnit.MILLISECONDS);
1153
            t.waitFinished(millis);
1154
            if (cancelled.get()) {
1155
                throw new CancellationException();
1156
            }
1157
            return null;
1158
        }
1159
    }
1160
1161
    private static final class FixedRateTask extends TaskFutureWrapper {
1162
        private final Object runLock = new Object();
1163
        private final Object timeLock = new Object();
1164
        //must be accessed holding timeLock
1165
        private int runCount;
1166
        private long nextRunTime;
1167
        private long start = Long.MIN_VALUE;
1168
        volatile boolean firstRun = true;
1169
        FixedRateTask (Runnable run, long initialDelay, long period) {
1170
            super (run, initialDelay, period);
1171
        }
1172
1173
        @Override
1174
        public void run() {
1175
            if (firstRun) {
1176
                synchronized (timeLock) {
1177
                    start = System.currentTimeMillis();
1178
                    firstRun = false;
1179
                }
1180
            }
1181
            try {
1182
                synchronized(runLock) {
1183
                    toRun.run();
1184
                }
1185
            } catch (RuntimeException e) {
1186
                cancel(true);
1187
                throw e;
1188
            }
1189
            reschedule();
1190
        }
1191
1192
        private void reschedule() {
1193
            //All access to nextRunTime & runCount under lock.
1194
            long interval;
1195
            synchronized (timeLock) {
1196
                nextRunTime = start + (initialDelay + period * runCount);
1197
                runCount++;
1198
                interval = Math.max(0,  nextRunTime - System.currentTimeMillis());
1199
                if (interval > Integer.MAX_VALUE) {
1200
                    throw new IllegalStateException ("Interval > Integer.MAX_VALUE: " + interval); //NOI18N
1201
                }
1202
            }
1203
            boolean canContinue = !cancelled.get() && !Thread.currentThread().isInterrupted();
1204
            if (canContinue) {
1205
                t.schedule((int) interval);
1206
            }
1207
        }
1208
1209
        @Override
1210
        public long getDelay(TimeUnit unit) {
1211
            if (isCancelled()) {
1212
                return Long.MAX_VALUE;
1213
            }
1214
            long delay;
1215
            synchronized (timeLock) {
1216
                delay = Math.min(0, nextRunTime - System.currentTimeMillis());
1217
            }
1218
            return unit.convert(delay, TimeUnit.MILLISECONDS);
1219
        }
1220
    }
1221
1222
    private static final class FixedDelayTask extends TaskFutureWrapper {
1223
        private volatile boolean firstRun = true;
1224
        private final AtomicLong nextRunTime = new AtomicLong();
1225
        FixedDelayTask(Runnable run, long initialDelay, long period)  {
1226
            super (run, initialDelay, period);
1227
        }
1228
1229
        @Override
1230
        public long getDelay(TimeUnit unit) {
1231
            long next = nextRunTime.get();
1232
            return unit.convert (next - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
1233
        }
1234
1235
        @Override
1236
        public void run() {
1237
            if (!fini()) {
1238
                toRun.run();
1239
            }
1240
            if (!fini()) {
1241
                reschedule();
1242
            }
1243
        }
1244
1245
        private boolean fini() {
1246
            boolean result = cancelled.get() || Thread.currentThread().isInterrupted();
1247
            return result;
1248
        }
1249
1250
        private void reschedule() {
1251
            long delay;
1252
            if (firstRun) {
1253
                delay = initialDelay;
1254
            } else {
1255
                delay = period;
1256
            }
1257
            nextRunTime.set(System.currentTimeMillis() + delay);
1258
            firstRun = false;
1259
            if (!fini()) {
1260
                t.schedule((int) delay);
1261
            }
1262
        }
1263
    }
1264
1265
    private interface RunnableWrapper {
1266
        Runnable getRunnable();
1267
    }
1268
1269
    private static final class WaitableCallable<T> implements Callable<T>, Cancellable {
1270
        private final CountDownLatch countdown;
1271
        private final Callable<T> delegate;
1272
        private final AtomicReference<T> ref;
1273
        private volatile boolean failed;
1274
        WaitableCallable(Callable<T> delegate, CountDownLatch countdown) {
1275
            this (delegate, null, countdown);
1276
        }
1277
1278
        WaitableCallable(Callable<T> delegate, AtomicReference<T> ref, CountDownLatch countdown) {
1279
            this.delegate = delegate;
1280
            this.countdown = countdown;
1281
            this.ref = ref;
1282
        }
1283
1284
        boolean failed() {
1285
            return failed;
1286
        }
1287
1288
        @Override
1289
        public T call() throws Exception {
1290
            try {
1291
                T result = delegate.call();
1292
                if (ref != null) {
1293
                    ref.set(result);
1294
                }
1295
                return result;
1296
            } catch (RuntimeException e) {
1297
                failed = true;
1298
                throw e;
1299
            } catch (Error e) {
1300
                failed = true;
1301
                throw e;
1302
            } finally {
1303
                if (!failed || ref == null) {
1304
                    countdown.countDown();
1305
                }
1306
            }
1307
        }
1308
1309
        @Override
1310
        public boolean cancel() {
1311
            return delegate instanceof Cancellable ? ((Cancellable) delegate).cancel() : true;
1312
        }
1313
    }
1314
1315
    private static class RPFutureTask<T> extends FutureTask<T> implements RunnableWrapper {
1316
        protected volatile Task task;
1317
        private final Runnable runnable;
1318
        private final Cancellable cancellable;
1319
        RPFutureTask(Callable<T> c) {
1320
            super (c);
1321
            this.runnable = null;
1322
            this.cancellable = c instanceof Cancellable ? (Cancellable) c : null;
1323
        }
1324
1325
        RPFutureTask(Runnable r, T result) {
1326
            super (r, result);
1327
            this.runnable = r;
1328
            this.cancellable = r instanceof Cancellable ? (Cancellable) r : null;
1329
        }
1330
1331
        void setTask(Task task) {
1332
            this.task = task;
1333
        }
1334
1335
        RPFutureTask(Callable<T> c, T predefinedResult) {
1336
            this (c);
1337
            set(predefinedResult);
1338
        }
1339
1340
        @Override
1341
        public boolean cancel(boolean mayInterruptIfRunning) {
1342
            if (cancellable != null) {
1343
                boolean result = cancellable.cancel();
1344
                if (result) {
1345
                    //note & not && - task.cancel() and super.cancel() must be invoked,
1346
                    //if and only if the underlying cancellable is really null or
1347
                    //returned true from cancel().
1348
                    return task.cancel() & super.cancel(mayInterruptIfRunning);
1349
                } else {
1350
                    return result;
1351
                }
1352
            } else {
1353
                return task.cancel() & super.cancel(mayInterruptIfRunning);
1354
            }
1355
        }
1356
1357
        @Override
1358
        public Runnable getRunnable() {
1359
            return this.runnable;
1360
        }
1361
    }
1362
1363
    private static final class ScheduledRPFutureTask<T> extends RPFutureTask<T> implements ScheduledFuture<T> {
1364
        protected final long delayMillis;
1365
        ScheduledRPFutureTask(Callable<T> c, long delayMillis) {
1366
            super (c);
1367
            this.delayMillis = delayMillis;
1368
        }
1369
1370
        ScheduledRPFutureTask(Runnable r, T result, long delayMillis) {
1371
            super (r, result);
1372
            this.delayMillis = delayMillis;
1373
        }
1374
1375
        
1376
        @Override
1377
        public long getDelay(TimeUnit unit) {
1378
            return TimeUnit.MILLISECONDS.convert(delayMillis, unit);
1379
        }
1380
1381
        @Override
1382
        public int compareTo(Delayed o) {
1383
            //Can overflow, if one delay is, say, days, and the other, microseconds
1384
            long otherDelayMillis = o.getDelay(TimeUnit.MILLISECONDS);
1385
            return (int) (delayMillis - otherDelayMillis);
1386
        }
1387
    }
1388
588
    private class EnqueueTask extends TimerTask {
1389
    private class EnqueueTask extends TimerTask {
589
        Item itm;
1390
        Item itm;
590
        
1391
        
Lines 610-615 Link Here
610
        private int priority = Thread.MIN_PRIORITY;
1411
        private int priority = Thread.MIN_PRIORITY;
611
        private long time = 0;
1412
        private long time = 0;
612
        private Thread lastThread = null;
1413
        private Thread lastThread = null;
1414
        private AtomicBoolean cancelled;
613
1415
614
        /** @param run runnable to start
1416
        /** @param run runnable to start
615
        * @param delay amount of millis to wait
1417
        * @param delay amount of millis to wait
Lines 685-690 Link Here
685
            final Item localItem;
1487
            final Item localItem;
686
1488
687
            synchronized (processorLock) {
1489
            synchronized (processorLock) {
1490
                if (cancelled != null) {
1491
                    cancelled.set(false);
1492
                }
688
                notifyRunning();
1493
                notifyRunning();
689
1494
690
                if (item != null) {
1495
                if (item != null) {
Lines 747-752 Link Here
747
            }
1552
            }
748
        }
1553
        }
749
1554
1555
        /**
1556
         * Implementation of cancel for use with Future objects, to guarantee
1557
         * that the thread will be interrupted, no matter what the setting
1558
         * on the owning RP.
1559
         * @param interrupt If true, the thread should be interrupted
1560
         * @return true if cancellation occurred
1561
         */
1562
        boolean cancel (boolean interrupt) {
1563
            synchronized (processorLock) {
1564
                if (cancelled != null) {
1565
                    boolean wasCancelled = !cancelled.getAndSet(true);
1566
                    if (wasCancelled) {
1567
                        return false;
1568
                    }
1569
                }
1570
                boolean success;
1571
1572
                if (item == null) {
1573
                    success = false;
1574
                } else {
1575
                    Processor p = item.getProcessor();
1576
                    success = item.clear(null);
1577
1578
                    if (p != null) {
1579
                        if (interrupt) {
1580
                            success = p.interrupt(this, RequestProcessor.this);
1581
                        } else {
1582
                            //despite its name, will not actually interrupt
1583
                            //unless the RP specifies that it should
1584
                            p.interruptTask(this, RequestProcessor.this);
1585
                        }
1586
                        if (success) {
1587
                            item = null;
1588
                        }
1589
                    }
1590
                }
1591
                if (success) {
1592
                    notifyFinished(); // mark it as finished
1593
                }
1594
                return success;
1595
            }
1596
        }
1597
750
        /** Current priority of the task.
1598
        /** Current priority of the task.
751
        * @return the priority level (see e.g. {@link Thread#NORM_PRIORITY}
1599
        * @return the priority level (see e.g. {@link Thread#NORM_PRIORITY}
752
         */
1600
         */
Lines 1062-1067 Link Here
1062
            }
1910
            }
1063
        }
1911
        }
1064
1912
1913
        boolean belongsTo(RequestProcessor r) {
1914
            synchronized (lock) {
1915
                return source == r;
1916
            }
1917
        }
1918
1065
        /**
1919
        /**
1066
         * The method that will repeatedly wait for a request and perform it.
1920
         * The method that will repeatedly wait for a request and perform it.
1067
         */
1921
         */
Lines 1190-1195 Link Here
1190
            }
2044
            }
1191
        }
2045
        }
1192
2046
2047
        boolean interrupt (Task t, RequestProcessor src) {
2048
            if (t != todo) {
2049
                return false;
2050
            }
2051
            interrupt();
2052
            return true;
2053
        }
2054
1193
        /** @see "#20467" */
2055
        /** @see "#20467" */
1194
        private static void doNotify(RequestProcessor.Task todo, Throwable ex) {
2056
        private static void doNotify(RequestProcessor.Task todo, Throwable ex) {
1195
            if (SLOW && todo.item.message == null) {
2057
            if (SLOW && todo.item.message == null) {
(-)a/openide.util/test/unit/src/org/openide/util/RequestProcessor180386Test.java (+1176 lines)
Line 0 Link Here
1
/*
2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3
 *
4
 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
5
 *
6
 * The contents of this file are subject to the terms of either the GNU
7
 * General Public License Version 2 only ("GPL") or the Common
8
 * Development and Distribution License("CDDL") (collectively, the
9
 * "License"). You may not use this file except in compliance with the
10
 * License. You can obtain a copy of the License at
11
 * http://www.netbeans.org/cddl-gplv2.html
12
 * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the
13
 * specific language governing permissions and limitations under the
14
 * License.  When distributing the software, include this License Header
15
 * Notice in each file and include the License file at
16
 * nbbuild/licenses/CDDL-GPL-2-CP.  Sun designates this
17
 * particular file as subject to the "Classpath" exception as provided
18
 * by Sun in the GPL Version 2 section of the License file that
19
 * accompanied this code. If applicable, add the following below the
20
 * License Header, with the fields enclosed by brackets [] replaced by
21
 * your own identifying information:
22
 * "Portions Copyrighted [year] [name of copyright owner]"
23
 *
24
 * If you wish your version of this file to be governed by only the CDDL
25
 * or only the GPL Version 2, indicate your decision by adding
26
 * "[Contributor] elects to include this software in this distribution
27
 * under the [CDDL or GPL Version 2] license." If you do not indicate a
28
 * single choice of license, a recipient has the option to distribute
29
 * your version of this file under either the CDDL, the GPL Version 2 or
30
 * to extend the choice of license to its licensees as provided above.
31
 * However, if you add GPL Version 2 code and therefore, elected the GPL
32
 * Version 2 license, then the option applies only if the new code is
33
 * made subject to such option by the copyright holder.
34
 *
35
 * Contributor(s):
36
 *
37
 * Portions Copyrighted 2010 Sun Microsystems, Inc.
38
 */
39
package org.openide.util;
40
41
import java.util.ArrayList;
42
import java.util.Collections;
43
import java.util.HashSet;
44
import java.util.List;
45
import java.util.Set;
46
import java.util.concurrent.Callable;
47
import java.util.concurrent.CancellationException;
48
import java.util.concurrent.CopyOnWriteArrayList;
49
import java.util.concurrent.CountDownLatch;
50
import java.util.concurrent.Future;
51
import java.util.concurrent.ScheduledFuture;
52
import java.util.concurrent.TimeUnit;
53
import java.util.concurrent.atomic.AtomicInteger;
54
import java.util.logging.Level;
55
import junit.framework.Test;
56
import org.netbeans.junit.NbTestCase;
57
import org.netbeans.junit.NbTestSuite;
58
import org.netbeans.junit.RandomlyFails;
59
60
/**
61
 *
62
 * @author Tim Boudreau
63
 */
64
public class RequestProcessor180386Test extends NbTestCase {
65
66
    public RequestProcessor180386Test(java.lang.String testName) {
67
        super(testName);
68
    }
69
70
    public static Test suite() {
71
        Test t = null;
72
        if (t == null) {
73
            t = new NbTestSuite(RequestProcessor180386Test.class);
74
        }
75
        return t;
76
    }
77
78
    static {
79
        RequestProcessor.logger().setLevel(Level.ALL);
80
    }
81
82
    public void testSubmit() throws Exception {
83
        class C implements Callable<String> {
84
85
            volatile boolean hasRun;
86
87
            @Override
88
            public String call() throws Exception {
89
                String result = "Hello";
90
                hasRun = true;
91
                return result;
92
            }
93
        }
94
        C c = new C();
95
        Future<String> f = RequestProcessor.getDefault().submit(c);
96
        assertEquals("Hello", f.get());
97
        assertTrue(c.hasRun);
98
99
        class R implements Runnable {
100
101
            volatile boolean hasRun;
102
103
            @Override
104
            public void run() {
105
                hasRun = true;
106
            }
107
        }
108
        R r = new R();
109
        f = RequestProcessor.getDefault().submit(r, "Goodbye");
110
        assertEquals("Goodbye", f.get());
111
        assertTrue(r.hasRun);
112
    }
113
114
    public void testSomeTasksNotRunIfShutDown() throws Exception {
115
        final Object lock = new Object();
116
        int count = 10;
117
        final CountDownLatch waitAllLaunched = new CountDownLatch(count);
118
        final CountDownLatch waitOneFinished = new CountDownLatch(1);
119
        RequestProcessor rp = new RequestProcessor("TestRP", count * 2);
120
        class R implements Runnable {
121
122
            volatile boolean hasStarted;
123
            volatile boolean hasFinished;
124
125
            @Override
126
            public void run() {
127
                hasStarted = true;
128
                waitAllLaunched.countDown();
129
                synchronized (lock) {
130
                    try {
131
                        lock.wait();
132
                        if (Thread.interrupted()) {
133
                            return;
134
                        }
135
                    } catch (InterruptedException ex) {
136
                        return;
137
                    } finally {
138
                        waitOneFinished.countDown();
139
                    }
140
                    hasFinished = true;
141
                }
142
            }
143
        }
144
        Set<Future<String>> s = new HashSet<Future<String>>();
145
        Set<R> rs = new HashSet<R>();
146
        for (int i = 0; i < count; i++) {
147
            String currName = "Runnable " + i;
148
            R r = new R();
149
            rs.add(r);
150
            s.add(rp.submit(r, currName));
151
        }
152
        waitAllLaunched.await();
153
        synchronized (lock) {
154
            //Notify just one thread
155
            lock.notify();
156
        }
157
        waitOneFinished.await();
158
        List<Runnable> notRun = rp.shutdownNow();
159
        synchronized (lock) {
160
            lock.notifyAll();
161
        }
162
        boolean allFinished = true;
163
        int finishedCount = 0;
164
        for (R r : rs) {
165
            assertTrue(r.hasStarted);
166
            allFinished &= r.hasFinished;
167
            if (r.hasFinished) {
168
                finishedCount++;
169
            }
170
        }
171
        assertFalse("All tasks should not have completed", allFinished);
172
        assertTrue("At least one task shall complete", finishedCount >= 1);
173
        assertFalse(notRun.isEmpty());
174
        assertTrue(rp.isShutdown());
175
        //Technically not provable due to "spurious wakeups"
176
        //        assertEquals (1, finishedCount);
177
178
        try {
179
            RequestProcessor.getDefault().shutdown();
180
            fail("Should not be able to shutdown() default RP");
181
        } catch (Exception e) {
182
        }
183
        try {
184
            RequestProcessor.getDefault().shutdownNow();
185
            fail("Should not be able to shutdownNow() default RP");
186
        } catch (Exception e) {
187
        }
188
        for (Runnable nr : notRun) {
189
            assertTrue ("Shutdown is not returning submitted runnables - got a "
190
                    + nr.getClass().getName() + " instead of " +
191
                    R.class.getName(), nr.getClass() == R.class);
192
        }
193
    }
194
195
    public void testAwaitTermination() throws Exception {
196
        int count = 20;
197
        final Object lock = new Object();
198
        final CountDownLatch waitAllLaunched = new CountDownLatch(count);
199
        final CountDownLatch waitAll = new CountDownLatch(count);
200
        final RequestProcessor rp = new RequestProcessor("TestRP", count);
201
        class R implements Runnable {
202
203
            volatile boolean hasStarted;
204
            volatile boolean hasFinished;
205
206
            @Override
207
            public void run() {
208
                hasStarted = true;
209
                waitAllLaunched.countDown();
210
                synchronized (lock) {
211
                    try {
212
                        lock.wait();
213
                        if (Thread.interrupted()) {
214
                            return;
215
                        }
216
                    } catch (InterruptedException ex) {
217
                        return;
218
                    } finally {
219
                        hasFinished = true;
220
                        waitAll.countDown();
221
                    }
222
                }
223
            }
224
        }
225
        Set<Future<String>> s = new HashSet<Future<String>>();
226
        Set<R> rs = new HashSet<R>();
227
        for (int i = 0; i < count; i++) {
228
            String currName = "Runnable " + i;
229
            R r = new R();
230
            rs.add(r);
231
            s.add(rp.submit(r, currName));
232
        }
233
        waitAllLaunched.await();
234
        synchronized (lock) {
235
            //Notify just one thread
236
            lock.notifyAll();
237
        }
238
        rp.shutdown();
239
        boolean awaitTermination = rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
240
        assertTrue(awaitTermination);
241
        assertTrue(rp.isShutdown());
242
        assertTrue(rp.isTerminated());
243
    }
244
245
    @RandomlyFails
246
    public void testAwaitTerminationWaitsForNewlyAddedThreads() throws Exception {
247
        final RequestProcessor rp = new RequestProcessor("testAwaitTerminationWaitsForNewlyAddedThreads", 50, false);
248
        int count = 30;
249
        final CountDownLatch waitLock = new CountDownLatch(1);
250
        class R implements Runnable {
251
            boolean done;
252
            @Override
253
            public void run() {
254
                try {
255
                    waitLock.await();
256
                } catch (InterruptedException ex) {
257
                    done = true;
258
                } finally {
259
                    done = true;
260
                }
261
            }
262
        }
263
        Set<R> rs = new HashSet<R>();
264
        for (int i= 0; i < count; i++) {
265
            R r = new R();
266
            rs.add(r);
267
            rp.submit(r);
268
        }
269
        final CountDownLatch shutdownBegun = new CountDownLatch(1);
270
        Runnable shutdowner = new Runnable() {
271
            @Override
272
            public void run() {
273
                try {
274
                    Thread.sleep(1000);
275
                    rp.shutdown();
276
                    shutdownBegun.countDown();
277
                } catch (InterruptedException ex) {
278
                    Exceptions.printStackTrace(ex);
279
                }
280
            }
281
        };
282
        waitLock.countDown();
283
        new Thread(shutdowner).start();
284
        assertTrue(rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
285
        Thread.sleep (600);
286
        assertTrue (rp.isTerminated());
287
    }
288
289
    public void testInvokeAll() throws Exception {
290
        int count = 20;
291
        final CountDownLatch waitAll = new CountDownLatch(count);
292
        final RequestProcessor rp = new RequestProcessor("TestRP", count);
293
        try {
294
            class C implements Callable<String> {
295
296
                private final String result;
297
                volatile boolean ran;
298
299
                C(String result) {
300
                    this.result = result;
301
                }
302
303
                @Override
304
                public String call() throws Exception {
305
                    ran = true;
306
                    waitAll.countDown();
307
                    return result;
308
                }
309
            }
310
            List<C> callables = new ArrayList<C>(count);
311
            List<Future<String>> fs;
312
            Set<String> names = new HashSet<String>(count);
313
            for (int i = 0; i < count; i++) {
314
                String name = "R" + i;
315
                names.add(name);
316
                C c = new C(name);
317
                callables.add(c);
318
            }
319
            fs = rp.invokeAll(callables);
320
            assertNotNull(fs);
321
            assertEquals(0, waitAll.getCount());
322
            for (Future<String> f : fs) {
323
                assertTrue (f.isDone());
324
            }
325
            for (C c : callables) {
326
                assertTrue (c.ran);
327
            }
328
            Set<String> s = new HashSet<String>(count);
329
            for (Future<String> f : fs) {
330
                s.add(f.get());
331
            }
332
            assertEquals(names, s);
333
        } finally {
334
            rp.stop();
335
        }
336
    }
337
338
    public void testInvokeAllWithTimeout() throws Exception {
339
        int count = 20;
340
        final CountDownLatch blocker = new CountDownLatch(1);
341
        final RequestProcessor rp = new RequestProcessor("TestRP", count);
342
        try {
343
            class C implements Callable<String> {
344
                volatile boolean iAmSpecial;
345
346
                private final String result;
347
                volatile boolean ran;
348
349
                C(String result) {
350
                    this.result = result;
351
                }
352
353
                @Override
354
                public String call() throws Exception {
355
                    //Only one will be allowed to run, the rest
356
                    //will be cancelled
357
                    if (!iAmSpecial) {
358
                        blocker.await();
359
                    }
360
                    ran = true;
361
                    return result;
362
                }
363
            }
364
            List<C> callables = new ArrayList<C>(count);
365
            C special = new C("Special");
366
            special.iAmSpecial = true;
367
            callables.add(special);
368
            List<Future<String>> fs;
369
            Set<String> names = new HashSet<String>(count);
370
            for (int i = 0; i < count; i++) {
371
                String name = "R" + i;
372
                names.add(name);
373
                C c = new C(name);
374
                callables.add(c);
375
            }
376
            fs = rp.invokeAll(callables, 1000, TimeUnit.MILLISECONDS);
377
            assertNotNull(fs);
378
            for (Future<String> f : fs) {
379
                assertTrue (f.isDone());
380
            }
381
            for (C c : callables) {
382
                if (c == special) {
383
                    assertTrue (c.ran);
384
                } else {
385
                    assertFalse(c.ran);
386
                }
387
            }
388
        } finally {
389
            rp.stop();
390
        }
391
    }
392
393
    public void testInvokeAllSingleThread() throws Exception {
394
        int count = 20;
395
        final CountDownLatch waitAll = new CountDownLatch(count);
396
        final RequestProcessor rp = new RequestProcessor("TestRP", 1);
397
        class C implements Callable<String> {
398
399
            private final String result;
400
401
            C(String result) {
402
                this.result = result;
403
            }
404
405
            @Override
406
            public String call() throws Exception {
407
                waitAll.countDown();
408
                return result;
409
            }
410
        }
411
        List<C> l = new ArrayList<C>(count);
412
        List<Future<String>> fs;
413
        Set<String> names = new HashSet<String>(count);
414
        for (int i = 0; i < count; i++) {
415
            String name = "R" + i;
416
            names.add(name);
417
            C c = new C(name);
418
            l.add(c);
419
        }
420
        fs = rp.invokeAll(l);
421
        assertNotNull(fs);
422
        Set<String> s = new HashSet<String>(count);
423
        for (Future<String> f : fs) {
424
            s.add(f.get());
425
        }
426
        assertEquals(names, s);
427
    }
428
429
    public void testInvokeAny() throws Exception {
430
        int count = 20;
431
        final RequestProcessor rp = new RequestProcessor("TestRP", count + 1);
432
        class C implements Callable<String> {
433
434
            private final String result;
435
436
            C(String result) {
437
                this.result = result;
438
            }
439
440
            @Override
441
            public String call() throws Exception {
442
                if (Thread.interrupted()) {
443
                    return null;
444
                }
445
                return result;
446
            }
447
        }
448
        List<C> l = new ArrayList<C>(count);
449
        Set<String> names = new HashSet<String>(count);
450
        for (int i = 0; i < count; i++) {
451
            String name = "R" + i;
452
            names.add(name);
453
            C c = new C(name);
454
            l.add(c);
455
        }
456
        String res = rp.invokeAny(l);
457
        assertNotNull(res);
458
        assertTrue(res.startsWith("R"));
459
    }
460
461
    public void testInvokeAnySingleThread() throws Exception {
462
        int count = 1000;
463
        final RequestProcessor rp = new RequestProcessor("TestRP", 20);
464
        final CountDownLatch latch = new CountDownLatch(count);
465
        class C implements Callable<String> {
466
467
            volatile boolean hasRun;
468
            private final String result;
469
470
            C(String result) {
471
                this.result = result;
472
            }
473
474
            @Override
475
            public String call() throws Exception {
476
                latch.countDown();
477
                if (!"R17".equals(result)) {
478
                    try {
479
                        //Block all but one thread until threads have entered
480
                        latch.await();
481
                    } catch (InterruptedException ie) {
482
                    }
483
                }
484
                Thread.yield();
485
                hasRun = true;
486
                return result;
487
            }
488
        }
489
        List<C> l = new ArrayList<C>(count);
490
        Set<String> names = new HashSet<String>(count);
491
        for (int i = 0; i < count; i++) {
492
            String name = "R" + i;
493
            names.add(name);
494
            C c = new C(name);
495
            l.add(c);
496
        }
497
        String res = rp.invokeAny(l);
498
        assertNotNull(res);
499
        assertTrue(res.startsWith("R"));
500
        int runCount = 0;
501
        for (C c : l) {
502
            if (c.hasRun) {
503
                runCount++;
504
            }
505
        }
506
        assertTrue("Not all " + count + " threads should have completed, but " + runCount + " did.", runCount < count);
507
    }
508
509
    public void testInvokeAnyWithTimeout() throws Exception {
510
        int count = 20;
511
        final RequestProcessor rp = new RequestProcessor("TestRP", count + 1);
512
        final CountDownLatch latch = new CountDownLatch(1);
513
        class C implements Callable<String> {
514
515
            volatile boolean hasRun;
516
            private final String result;
517
518
            C(String result) {
519
                this.result = result;
520
            }
521
522
            @Override
523
            public String call() throws Exception {
524
                latch.await();
525
                if (Thread.interrupted()) {
526
                    return null;
527
                }
528
                hasRun = true;
529
                return result;
530
            }
531
        }
532
        List<C> l = new ArrayList<C>(count);
533
        Set<String> names = new HashSet<String>(count);
534
        for (int i = 0; i < count; i++) {
535
            String name = "R" + i;
536
            names.add(name);
537
            C c = new C(name);
538
            l.add(c);
539
        }
540
        //All threads are waiting on latch;  we should time out
541
        String res = rp.invokeAny(l, 400, TimeUnit.MILLISECONDS);
542
        assertNull(res);
543
        for (C c : l) {
544
            assertFalse(c.hasRun);
545
        }
546
    }
547
548
    public void testCancellation() throws Exception {
549
        final CountDownLatch latch = new CountDownLatch(1);
550
        class C implements Callable<String> {
551
552
            volatile boolean hasRun;
553
            volatile boolean interrupted;
554
555
            @Override
556
            public String call() throws Exception {
557
                try {
558
                    latch.await();
559
                } catch (InterruptedException e) {
560
                    interrupted = true;
561
                    return null;
562
                }
563
                if (Thread.interrupted()) {
564
                    interrupted = true;
565
                    return null;
566
                }
567
                hasRun = true;
568
                return "Hello";
569
            }
570
        }
571
        C c = new C();
572
        Future<String> f = RequestProcessor.getDefault().submit(c);
573
        f.cancel(true);
574
        latch.countDown();
575
        String s = null;
576
        try {
577
            s = f.get();
578
            fail("CancellationException should have been thrown");
579
        } catch (CancellationException e) {
580
        }
581
        assertNull(s);
582
        assertTrue(c.interrupted || !c.hasRun);
583
        assertFalse(c.hasRun);
584
    }
585
586
    public void testCancellablesGetCancelInvokedWithCallable() throws Exception {
587
        final CountDownLatch latch = new CountDownLatch(1);
588
        final CountDownLatch exit = new CountDownLatch(1);
589
        class C implements Callable<String>, Cancellable {
590
591
            volatile boolean hasRun;
592
            volatile boolean interrupted;
593
            volatile boolean cancelled;
594
595
            @Override
596
            public String call() throws Exception {
597
                try {
598
                    try {
599
                        latch.await();
600
                    } catch (InterruptedException e) {
601
                        interrupted = true;
602
                        return null;
603
                    }
604
                    if (Thread.interrupted()) {
605
                        interrupted = true;
606
                        return null;
607
                    }
608
                    if (cancelled) {
609
                        return null;
610
                    }
611
                    hasRun = true;
612
                    return "Hello";
613
                } finally {
614
                    exit.countDown();
615
                }
616
            }
617
618
            @Override
619
            public boolean cancel() {
620
                cancelled = true;
621
                exit.countDown();
622
                return true;
623
            }
624
        }
625
        C c = new C();
626
        Future<String> f = RequestProcessor.getDefault().submit(c);
627
        f.cancel(true);
628
        assertTrue (c.cancelled);
629
        latch.countDown();
630
        exit.await();
631
        String s = null;
632
        try {
633
            s = f.get();
634
            fail ("Should have gotten cancellation exception");
635
        } catch (CancellationException e) {
636
637
        }
638
    }
639
640
    public void testCancellablesGetCancelInvokedWithRunnable() throws Exception {
641
        final CountDownLatch latch = new CountDownLatch(1);
642
        final CountDownLatch exit = new CountDownLatch(1);
643
        class C implements Runnable, Cancellable {
644
645
            volatile boolean hasRun;
646
            volatile boolean interrupted;
647
            volatile boolean cancelled;
648
649
            @Override
650
            public void run() {
651
                try {
652
                    try {
653
                        latch.await();
654
                    } catch (InterruptedException e) {
655
                        interrupted = true;
656
                        return;
657
                    }
658
                    if (Thread.interrupted()) {
659
                        interrupted = true;
660
                        return;
661
                    }
662
                    if (cancelled) {
663
                        return;
664
                    }
665
                    hasRun = true;
666
                } finally {
667
                    exit.countDown();
668
                }
669
            }
670
671
            @Override
672
            public boolean cancel() {
673
                cancelled = true;
674
                exit.countDown();
675
                return true;
676
            }
677
        }
678
        C c = new C();
679
        Future<?> f = RequestProcessor.getDefault().submit(c);
680
        f.cancel(true);
681
        assertTrue (c.cancelled);
682
        latch.countDown();
683
        exit.await();
684
        try {
685
            f.get();
686
            fail ("Should have gotten cancellation exception");
687
        } catch (CancellationException e) {
688
689
        }
690
        assertFalse (c.hasRun);
691
    }
692
693
    public void testCancellablesThatSayTheyCantBeCancelledAreNotCancelledViaFutureDotCancel() throws Exception {
694
        final CountDownLatch latch = new CountDownLatch(1);
695
        final CountDownLatch exit = new CountDownLatch(1);
696
        class C implements Runnable, Cancellable {
697
698
            volatile boolean hasRun;
699
            volatile boolean interrupted;
700
            volatile boolean cancelCalled;
701
702
            @Override
703
            public void run() {
704
                try {
705
                    try {
706
                        latch.await();
707
                    } catch (InterruptedException e) {
708
                        interrupted = true;
709
                        throw new AssertionError(e);
710
                    }
711
                    if (Thread.interrupted()) {
712
                        interrupted = true;
713
                        throw new AssertionError("Thread should not have been interrupted");
714
                    }
715
                    hasRun = true;
716
                } finally {
717
                    exit.countDown();
718
                }
719
            }
720
721
            @Override
722
            public boolean cancel() {
723
                cancelCalled = true;
724
                return false;
725
            }
726
        }
727
        C c = new C();
728
        Future<?> f = RequestProcessor.getDefault().submit(c);
729
        f.cancel(true);
730
        assertFalse (f.isCancelled());
731
        assertTrue (c.cancelCalled);
732
        latch.countDown();
733
        exit.await();
734
        f.get();
735
        assertFalse (f.isCancelled());
736
        assertTrue (c.hasRun);
737
    }
738
739
    public void testInvokeAllCancellation() throws Exception {
740
        int count = 20;
741
        final CountDownLatch waitAll = new CountDownLatch(count);
742
        final RequestProcessor rp = new RequestProcessor("TestRP", count);
743
        class C implements Callable<String>, Cancellable {
744
745
            private final String result;
746
            volatile boolean cancelCalled;
747
748
            C(String result) {
749
                this.result = result;
750
            }
751
752
            @Override
753
            public String call() throws Exception {
754
                waitAll.countDown();
755
                return cancelCalled ? null : result;
756
            }
757
758
            @Override
759
            public boolean cancel() {
760
                cancelCalled = true;
761
                return false;
762
            }
763
        }
764
        List<C> l = new ArrayList<C>(count);
765
        List<Future<String>> fs;
766
        Set<String> names = new HashSet<String>(count);
767
        for (int i = 0; i < count; i++) {
768
            String name = "R" + i;
769
            names.add(name);
770
            C c = new C(name);
771
            l.add(c);
772
        }
773
        fs = rp.invokeAll(l);
774
        assertNotNull(fs);
775
        Set<String> s = new HashSet<String>(count);
776
        for (Future<String> f : fs) {
777
            s.add(f.get());
778
        }
779
        assertEquals(names, s);
780
    }
781
782
    public void testCannotScheduleLongerThanIntegerMaxValue() throws Exception {
783
        Runnable r = new Runnable() {
784
785
            @Override
786
            public void run() {
787
                fail ("Should not have been run");
788
            }
789
        };
790
        try {
791
            Future<?> f = RequestProcessor.getDefault().schedule(r, Long.MAX_VALUE, TimeUnit.DAYS);
792
            f.cancel(true);
793
        } catch (Exception e) {}
794
    }
795
796
    public void testCannotScheduleNegativeDelay() throws Exception {
797
        Runnable r = new Runnable() {
798
799
            @Override
800
            public void run() {
801
                fail ("Should not have been run");
802
            }
803
        };
804
        try {
805
            RequestProcessor.getDefault().schedule(r, -1L, TimeUnit.MILLISECONDS);
806
            fail ("Negative value accepetd");
807
        } catch (Exception e) {}
808
        try {
809
            RequestProcessor.getDefault().scheduleAtFixedRate(r, -1L, 22L, TimeUnit.MILLISECONDS);
810
            fail ("Negative value accepetd");
811
        } catch (Exception e) {}
812
        try {
813
            RequestProcessor.getDefault().scheduleAtFixedRate(r, 200, -22L, TimeUnit.MILLISECONDS);
814
            fail ("Negative value accepetd");
815
        } catch (Exception e) {}
816
        try {
817
            RequestProcessor.getDefault().scheduleWithFixedDelay(r, -1L, 22L, TimeUnit.MILLISECONDS);
818
            fail ("Negative value accepetd");
819
        } catch (Exception e) {}
820
        try {
821
            RequestProcessor.getDefault().scheduleWithFixedDelay(r, 1L, -22L, TimeUnit.MILLISECONDS);
822
            fail ("Negative value accepetd");
823
        } catch (Exception e) {}
824
    }
825
826
    public void testTaskCanRescheduleItself() throws Exception {
827
        final CountDownLatch latch = new CountDownLatch(2);
828
        class R implements Runnable {
829
            volatile RequestProcessor.Task task;
830
            volatile int runCount;
831
            @Override
832
            public void run() {
833
                runCount++;
834
                if (runCount == 1) {
835
                    task.schedule(0);
836
                }
837
                latch.countDown();
838
            }
839
        }
840
        R r = new R();
841
        RequestProcessor.Task t = RequestProcessor.getDefault().create(r);
842
        r.task = t;
843
        t.schedule(0);
844
        latch.await ();
845
        assertEquals (r.runCount, 2);
846
    }
847
848
    public void testScheduleRepeatingSanityFixedRate() throws Exception {
849
        final CountDownLatch latch = new CountDownLatch(5);
850
        class C implements Runnable {
851
            volatile int runCount;
852
            @Override
853
            public void run() {
854
                runCount++;
855
                latch.countDown();
856
            }
857
        }
858
        C c = new C();
859
        RequestProcessor.getDefault().scheduleWithFixedDelay(c, 0, 20, TimeUnit.MILLISECONDS);
860
//        latch.await(5000, TimeUnit.MILLISECONDS);
861
        latch.await();
862
        assertEquals (5, c.runCount);
863
    }
864
865
    public void testScheduleRepeatingSanityFixedDelay() throws Exception {
866
        final CountDownLatch latch = new CountDownLatch(5);
867
        class C implements Runnable {
868
            volatile int runCount;
869
            @Override
870
            public void run() {
871
                runCount++;
872
                latch.countDown();
873
            }
874
        }
875
        C c = new C();
876
        RequestProcessor.getDefault().scheduleAtFixedRate(c, 0, 20, TimeUnit.MILLISECONDS);
877
        latch.await(2000, TimeUnit.MILLISECONDS);
878
879
        assertEquals (5, c.runCount);
880
    }
881
882
    public void testScheduleOneShot() throws Exception {
883
        RequestProcessor rp = new RequestProcessor ("testScheduleOneShot", 5, true, true);
884
        try {
885
            class C implements Callable<String> {
886
                volatile long start = System.currentTimeMillis();
887
                private volatile long end;
888
889
                @Override
890
                public String call() throws Exception {
891
                    synchronized(this) {
892
                        end = System.currentTimeMillis();
893
                    }
894
                    return "Hello";
895
                }
896
897
                synchronized long elapsed() {
898
                    return end - start;
899
                }
900
            }
901
            C c = new C();
902
            int delay = 5000;
903
            //Use a 20 second timeout to have a reasonable chance of accuracy
904
            ScheduledFuture<String> f = rp.schedule(c, delay, TimeUnit.MILLISECONDS);
905
            assertEquals (5000, f.getDelay(TimeUnit.MILLISECONDS));
906
            assertNotNull(f.get());
907
            //Allow 4 seconds fudge-factor
908
            assertTrue (c.elapsed() > 4600);
909
            assertTrue (f.isDone());
910
        } finally {
911
            rp.stop();
912
        }
913
    }
914
915
    public void testScheduleRepeatingIntervalsAreRoughlyCorrect() throws Exception {
916
        int runCount = 5;
917
        final CountDownLatch latch = new CountDownLatch(runCount);
918
        final List<Long> intervals = Collections.synchronizedList(new ArrayList<Long> (runCount));
919
//        long initialDelay = 30000;
920
//        long period = 20000;
921
//        long fudgeFactor = 4000;
922
        long initialDelay = 3000;
923
        long period = 2000;
924
        long fudgeFactor = 400;
925
        long expectedInitialDelay = initialDelay - fudgeFactor;
926
        long expectedPeriod = period - fudgeFactor;
927
        class C implements Runnable {
928
            volatile long start = System.currentTimeMillis();
929
            private int runCount;
930
            @Override
931
            public void run() {
932
                runCount++;
933
                try {
934
                    synchronized(this) {
935
                        long end = System.currentTimeMillis();
936
                        intervals.add (end - start);
937
                        start = end;
938
                    }
939
                } finally {
940
                    latch.countDown();
941
                }
942
            }
943
        }
944
        C c = new C();
945
        RequestProcessor rp = new RequestProcessor ("testScheduleRepeating", 5, true);
946
        try {
947
            Future<?> f = rp.scheduleWithFixedDelay(c, initialDelay, period, TimeUnit.MILLISECONDS);
948
    //        latch.await(initialDelay + fudgeFactor + ((runCount - 1) * (period + fudgeFactor)), TimeUnit.MILLISECONDS); //XXX
949
            latch.await();
950
            f.cancel(true);
951
            for (int i= 0; i < Math.min(runCount, intervals.size()); i++) {
952
                long expect = i == 0 ? expectedInitialDelay : expectedPeriod;
953
                assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect);
954
            }
955
            //Ensure we have really exited
956
            try {
957
                f.get();
958
                fail ("CancellationException should have been thrown");
959
            } catch (CancellationException e) {}
960
            assertTrue(f.isCancelled());
961
            assertTrue(f.isDone());
962
        } finally {
963
            rp.stop();
964
        }
965
    }
966
967
    public void testScheduleFixedRateAreRoughlyCorrect() throws Exception {
968
        int runCount = 5;
969
        final CountDownLatch latch = new CountDownLatch(runCount);
970
        final List<Long> intervals = Collections.synchronizedList(new ArrayList<Long> (runCount));
971
//        long initialDelay = 30000;
972
//        long period = 20000;
973
//        long fudgeFactor = 4000;
974
        long initialDelay = 3000;
975
        long period = 2000;
976
        long fudgeFactor = 400;
977
        long expectedInitialDelay = initialDelay - fudgeFactor;
978
        long expectedPeriod = period - fudgeFactor;
979
        class C implements Runnable {
980
            volatile long start = System.currentTimeMillis();
981
            private int runCount;
982
            @Override
983
            public void run() {
984
                runCount++;
985
                try {
986
                    synchronized(this) {
987
                        long end = System.currentTimeMillis();
988
                        intervals.add (end - start);
989
                        start = end;
990
                    }
991
                } finally {
992
                    latch.countDown();
993
                }
994
            }
995
        }
996
        C c = new C();
997
        RequestProcessor rp = new RequestProcessor ("testScheduleFixedRateAreRoughlyCorrect", 5, true);
998
        try {
999
            Future<?> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
1000
            latch.await();
1001
            f.cancel(true);
1002
            for (int i= 0; i < Math.min(runCount, intervals.size()); i++) {
1003
                long expect = i == 0 ? expectedInitialDelay : expectedPeriod;
1004
                assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect);
1005
            }
1006
            //Ensure we have really exited
1007
            try {
1008
                f.get();
1009
                fail ("CancellationException should have been thrown");
1010
            } catch (CancellationException e) {}
1011
            assertTrue(f.isCancelled());
1012
            assertTrue(f.isDone());
1013
        } finally {
1014
            rp.stop();
1015
        }
1016
    }
1017
1018
    public void testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution() throws Exception {
1019
        final AtomicInteger val = new AtomicInteger(0);
1020
        final CountDownLatch latch = new CountDownLatch(10);
1021
        class C implements Runnable {
1022
            boolean failed;
1023
            @Override
1024
            public void run() {
1025
                try {
1026
                    int now = val.incrementAndGet();
1027
                    if (now > 1) {
1028
                        failed = true;
1029
                        fail (now + " threads simultaneously in run()");
1030
                    }
1031
                    try {
1032
                        //Intentionally sleep *longer* than the interval
1033
                        //between executions.  We *want* to pile up all of the
1034
                        //RP threads entering run() - synchronization should
1035
                        //serialize them.  This test is to prove that this
1036
                        //method will never be called concurrently from two threads
1037
                        Thread.sleep(1000);
1038
                    } catch (InterruptedException ex) {
1039
1040
                    }
1041
                } finally {
1042
                    val.decrementAndGet();
1043
                    latch.countDown();
1044
                }
1045
            }
1046
        }
1047
        C c = new C();
1048
        long initialDelay = 2000;
1049
        long period = 10;
1050
        RequestProcessor rp = new RequestProcessor("testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution", 10, true);
1051
        rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
1052
        latch.await();
1053
        assertFalse(c.failed);
1054
        rp.stop();
1055
    }
1056
1057
    @RandomlyFails
1058
    public void testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed() throws Exception {
1059
        final CountDownLatch latch = new CountDownLatch(10);
1060
        final List<Long> intervals = new CopyOnWriteArrayList<Long>();
1061
        class C implements Runnable {
1062
            long start = Long.MIN_VALUE;
1063
1064
            @Override
1065
            public void run() {
1066
                long end = System.currentTimeMillis();
1067
                if (start != Long.MIN_VALUE) {
1068
                    intervals.add(end - start);
1069
                }
1070
                try {
1071
                    Thread.sleep(500);
1072
                } catch (InterruptedException ex) {
1073
                    
1074
                }
1075
                start = System.currentTimeMillis();
1076
                latch.countDown();
1077
            }
1078
        }
1079
        C c = new C();
1080
        long initialDelay = 100;
1081
        long period = 100;
1082
        RequestProcessor rp = new RequestProcessor("testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed", 10, true);
1083
        ScheduledFuture<?> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
1084
        latch.await();
1085
        f.cancel(true);
1086
        rp.stop();
1087
        int max = intervals.size();
1088
        for (int i= 0; i < max; i++) {
1089
            long iv = intervals.get(i);
1090
            assertFalse ("Interval " + i + " should have been at least less than requested interval * 1.5 with fixed rate" + iv, iv > 150);
1091
        }
1092
    }
1093
1094
    public void testCancelFutureInterruptsThreadEvenIfRequestProcessorForbidsIt() throws Exception {
1095
        RequestProcessor rp = new RequestProcessor ("X", 3, false, true);
1096
        final CountDownLatch releaseForRun = new CountDownLatch(1);
1097
        final CountDownLatch enterLatch = new CountDownLatch(1);
1098
        final CountDownLatch exitLatch = new CountDownLatch(1);
1099
        class R implements Runnable {
1100
            volatile boolean interrupted;
1101
            @Override
1102
            public void run() {
1103
                enterLatch.countDown();
1104
                try {
1105
                    releaseForRun.await();
1106
                } catch (InterruptedException ex) {
1107
                    interrupted = true;
1108
                }
1109
                interrupted |= Thread.interrupted();
1110
                exitLatch.countDown();
1111
            }
1112
        }
1113
        R r = new R();
1114
        Future<?> f = rp.submit(r);
1115
        enterLatch.await();
1116
        f.cancel(true);
1117
        assertTrue (f.isCancelled());
1118
        exitLatch.await();
1119
        assertTrue (r.interrupted);
1120
    }
1121
1122
    public void testCancelDoesNotInterruptIfNotPassedToFutureDotCancel() throws Exception {
1123
        RequestProcessor rp = new RequestProcessor ("X", 3, false, true);
1124
        final CountDownLatch releaseForRun = new CountDownLatch(1);
1125
        final CountDownLatch enterLatch = new CountDownLatch(1);
1126
        final CountDownLatch exitLatch = new CountDownLatch(1);
1127
        class R implements Runnable {
1128
            volatile boolean interrupted;
1129
            @Override
1130
            public void run() {
1131
                enterLatch.countDown();
1132
                try {
1133
                    releaseForRun.await();
1134
                } catch (InterruptedException ex) {
1135
                    interrupted = true;
1136
                }
1137
                interrupted |= Thread.interrupted();
1138
                exitLatch.countDown();
1139
            }
1140
        }
1141
        R r = new R();
1142
        Future<?> f = rp.submit(r);
1143
        enterLatch.await();
1144
        f.cancel(false);
1145
        assertTrue (f.isCancelled());
1146
        assertFalse (r.interrupted);
1147
    }
1148
1149
    public void testCancelDoesInterruptIfRequestProcessorSpecifiesItEvenIfFalsePassedToFutureDotCancel() throws Exception {
1150
        RequestProcessor rp = new RequestProcessor ("X", 3, true, true);
1151
        final CountDownLatch releaseForRun = new CountDownLatch(1);
1152
        final CountDownLatch enterLatch = new CountDownLatch(1);
1153
        final CountDownLatch exitLatch = new CountDownLatch(1);
1154
        class R implements Runnable {
1155
            volatile boolean interrupted;
1156
            @Override
1157
            public void run() {
1158
                enterLatch.countDown();
1159
                try {
1160
                    releaseForRun.await();
1161
                } catch (InterruptedException ex) {
1162
                    interrupted = true;
1163
                }
1164
                interrupted |= Thread.interrupted();
1165
                exitLatch.countDown();
1166
            }
1167
        }
1168
        R r = new R();
1169
        Future<?> f = rp.submit(r);
1170
        enterLatch.await();
1171
        f.cancel(false);
1172
        assertTrue (f.isCancelled());
1173
        exitLatch.await();
1174
        assertTrue (r.interrupted);
1175
    }
1176
}

Return to bug 180386