Lines 41-55
Link Here
|
41 |
|
41 |
|
42 |
package org.openide.util; |
42 |
package org.openide.util; |
43 |
|
43 |
|
|
|
44 |
import java.util.ArrayList; |
44 |
import java.util.Arrays; |
45 |
import java.util.Arrays; |
|
|
46 |
import java.util.Collection; |
45 |
import java.util.HashSet; |
47 |
import java.util.HashSet; |
46 |
import java.util.LinkedList; |
48 |
import java.util.LinkedList; |
47 |
import java.util.List; |
49 |
import java.util.List; |
48 |
import java.util.ListIterator; |
50 |
import java.util.ListIterator; |
|
|
51 |
import java.util.Set; |
49 |
import java.util.Stack; |
52 |
import java.util.Stack; |
50 |
import java.util.Timer; |
53 |
import java.util.Timer; |
51 |
import java.util.TimerTask; |
54 |
import java.util.TimerTask; |
|
|
55 |
import java.util.concurrent.Callable; |
56 |
import java.util.concurrent.CancellationException; |
57 |
import java.util.concurrent.CountDownLatch; |
58 |
import java.util.concurrent.Delayed; |
59 |
import java.util.concurrent.ExecutionException; |
52 |
import java.util.concurrent.Executor; |
60 |
import java.util.concurrent.Executor; |
|
|
61 |
import java.util.concurrent.Future; |
62 |
import java.util.concurrent.FutureTask; |
63 |
import java.util.concurrent.RejectedExecutionException; |
64 |
import java.util.concurrent.ScheduledExecutorService; |
65 |
import java.util.concurrent.ScheduledFuture; |
66 |
import java.util.concurrent.TimeUnit; |
67 |
import java.util.concurrent.TimeoutException; |
68 |
import java.util.concurrent.atomic.AtomicBoolean; |
69 |
import java.util.concurrent.atomic.AtomicLong; |
70 |
import java.util.concurrent.atomic.AtomicReference; |
53 |
import java.util.logging.Level; |
71 |
import java.util.logging.Level; |
54 |
import java.util.logging.Logger; |
72 |
import java.util.logging.Logger; |
55 |
|
73 |
|
Lines 157-163
Link Here
|
157 |
* |
175 |
* |
158 |
* @author Petr Nejedly, Jaroslav Tulach |
176 |
* @author Petr Nejedly, Jaroslav Tulach |
159 |
*/ |
177 |
*/ |
160 |
public final class RequestProcessor implements Executor { |
178 |
public final class RequestProcessor implements Executor, ScheduledExecutorService { |
161 |
/** the static instance for users that do not want to have own processor */ |
179 |
/** the static instance for users that do not want to have own processor */ |
162 |
private static RequestProcessor DEFAULT = new RequestProcessor(); |
180 |
private static RequestProcessor DEFAULT = new RequestProcessor(); |
163 |
|
181 |
|
Lines 186-192
Link Here
|
186 |
|
204 |
|
187 |
/** If the RP was stopped, this variable will be set, every new post() |
205 |
/** If the RP was stopped, this variable will be set, every new post() |
188 |
* will throw an exception and no task will be processed any further */ |
206 |
* will throw an exception and no task will be processed any further */ |
189 |
boolean stopped = false; |
207 |
volatile boolean stopped = false; |
190 |
|
208 |
|
191 |
/** The lock covering following four fields. They should be accessed |
209 |
/** The lock covering following four fields. They should be accessed |
192 |
* only while having this lock held. */ |
210 |
* only while having this lock held. */ |
Lines 585-590
Link Here
|
585 |
} |
603 |
} |
586 |
} |
604 |
} |
587 |
|
605 |
|
|
|
606 |
/** |
607 |
* Shut down this thread pool as defined in |
608 |
* {@link java.util.concurrent.ExecutorService#shutdown()}.<p/>If this method |
609 |
* is called on the <a href="#getDefault()">default RequestProcessor</a>, |
610 |
* throws an IllegalStateException. |
611 |
* @since 8.2 |
612 |
*/ |
613 |
@Override |
614 |
public void shutdown() { |
615 |
if (this == UNLIMITED) { |
616 |
throw new IllegalStateException ("Cannot shut down the default " + //NOI18N |
617 |
"request processor"); //NOI18N |
618 |
} |
619 |
stop(); |
620 |
} |
621 |
|
622 |
/** |
623 |
* Shut down this thread pool as defined in |
624 |
* {@link java.util.concurrent.ExecutorService#shutdownNow()}, returning |
625 |
* a list of un-run or unfinished runnables. <p/> If this method |
626 |
* is called on the <a href="#getDefault()">default RequestProcessor</a>, |
627 |
* throws an IllegalStateException. |
628 |
* @since 8.2 |
629 |
*/ |
630 |
@Override |
631 |
public List<Runnable> shutdownNow() { |
632 |
if (this == UNLIMITED) { |
633 |
throw new IllegalStateException ("Cannot shut down the default " + //NOI18N |
634 |
"request processor"); //NOI18N |
635 |
} |
636 |
//XXX more aggressive shutdown? |
637 |
stop(); |
638 |
synchronized (processorLock) { |
639 |
List<Runnable> result = new ArrayList<Runnable>(processors.size()); |
640 |
for (Processor p : processors) { |
641 |
if (p != null && p.todo != null && p.todo.run != null) { |
642 |
Runnable r = p.todo.run; |
643 |
if (r instanceof RunnableWrapper) { |
644 |
Runnable other = ((RunnableWrapper) r).getRunnable(); |
645 |
r = other == null ? r : other; |
646 |
} |
647 |
result.add(r); |
648 |
} |
649 |
} |
650 |
return result; |
651 |
} |
652 |
} |
653 |
|
654 |
/** |
655 |
* Determine if this thread pool has been shut down, in accordance with |
656 |
* the specification of {@link java.util.concurrent.ExecutorService#shutdown()} |
657 |
* @return |
658 |
* @since 8.2 |
659 |
*/ |
660 |
@Override |
661 |
public boolean isShutdown() { |
662 |
return stopped; |
663 |
} |
664 |
|
665 |
/** |
666 |
* Determine if this thread pool has been terminated, in accordance with |
667 |
* the specification of {@link java.util.concurrent.ExecutorService#isTerminated()} |
668 |
* @return |
669 |
* @since 8.2 |
670 |
*/ |
671 |
@Override |
672 |
public boolean isTerminated() { |
673 |
boolean result = true; |
674 |
Set<Processor> set = collectProcessors(new HashSet<Processor>()); |
675 |
for (Processor p : set) { |
676 |
if (p.isAlive() && p.belongsTo(this)) { |
677 |
result = false; |
678 |
break; |
679 |
} |
680 |
} |
681 |
return result; |
682 |
} |
683 |
|
684 |
/** |
685 |
* Determine if this thread pool has been shut down, in accordance with |
686 |
* the specification of {@link java.util.concurrent.ExecutorService#shutdown()} |
687 |
* @return True if the request processor is shut down before the timeout |
688 |
* @since 8.2 |
689 |
*/ |
690 |
@Override |
691 |
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { |
692 |
Parameters.notNull("unit", unit); //NOI18N |
693 |
long timeoutMillis = unit.convert(timeout, TimeUnit.MILLISECONDS); |
694 |
boolean result = stopped; |
695 |
long doneTime = System.currentTimeMillis() + timeoutMillis; |
696 |
Set<Processor> procs = new HashSet<Processor>(); |
697 |
outer: do { |
698 |
procs = collectProcessors(procs); |
699 |
if (procs.isEmpty()) { |
700 |
return true; |
701 |
} |
702 |
for (Processor p : procs) { |
703 |
long remaining = doneTime - System.currentTimeMillis(); |
704 |
if (remaining <= 0) { |
705 |
result = collectProcessors(procs).isEmpty(); |
706 |
break outer; |
707 |
} |
708 |
if (p.belongsTo(this)) { |
709 |
p.join(remaining); |
710 |
} |
711 |
result = !p.isAlive() || !p.belongsTo(this); |
712 |
} |
713 |
procs.clear(); |
714 |
} while (!procs.isEmpty()); |
715 |
return result; |
716 |
} |
717 |
|
718 |
private Set<Processor> collectProcessors (Set<Processor> procs) { |
719 |
procs.clear(); |
720 |
synchronized (processorLock) { |
721 |
for (Processor p : processors) { |
722 |
if (p.belongsTo(this)) { |
723 |
procs.add(p); |
724 |
} |
725 |
} |
726 |
} |
727 |
return procs; |
728 |
} |
729 |
|
730 |
/** |
731 |
* Submit a job to be run, and get back a {@link java.util.concurrent.Future} |
732 |
* which can be waited on and asked for a result, following the contract of |
733 |
* {@link java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable)} |
734 |
* <p/> |
735 |
* <b>Note:</b> If the passed {@link java.util.concurrent.Callable} implements |
736 |
* {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} |
737 |
* method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. |
738 |
* If <code>Cancellable.cancel()</code> returns false, then <i>the job will <u>not</u> be |
739 |
* cancelled</i>. |
740 |
* |
741 |
* @param <T> The return type of the passed job |
742 |
* @param task The work to do |
743 |
* @return A Future that can indicate work status and fetch a result |
744 |
* @throws RejectedExecutionException if this RequestProcessor has been |
745 |
* shut down |
746 |
* @since 8.2 |
747 |
*/ |
748 |
@Override |
749 |
public <T> Future<T> submit(Callable<T> task) { |
750 |
Parameters.notNull("task", task); //NOI18N |
751 |
if (stopped) { |
752 |
throw new RejectedExecutionException("Request Processor already " + //NOI18N |
753 |
"stopped"); //NOI18N |
754 |
} |
755 |
RPFutureTask<T> result = new RPFutureTask<T>(task); |
756 |
Task t = create(result); |
757 |
result.setTask(t); |
758 |
t.schedule(0); |
759 |
return result; |
760 |
} |
761 |
/** |
762 |
* Submit a job to be run, and get back a {@link java.util.concurrent.Future} |
763 |
* which can be waited on and asked for a result, following the contract of |
764 |
* {@link java.util.concurrent.ExecutorService#submit(java.lang.Runnable, T)} |
765 |
* <p/> |
766 |
* <b>Note:</b> If the passed {@link java.lang.Runnable} implements |
767 |
* {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} |
768 |
* method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. |
769 |
* If <code>Cancellable.cancel()</code> returns false, then <i>the job will <u>not</u> be |
770 |
* cancelled</i>. |
771 |
* |
772 |
* @param <T> The return type of the passed job |
773 |
* @param task The work to do |
774 |
* @param predefinedResult The return value for the resulting Future's |
775 |
* <code>get()</code> method |
776 |
* @return A Future that can indicate work status and fetch the result |
777 |
* @throws RejectedExecutionException if this RequestProcessor has been |
778 |
* shut down |
779 |
* @since 8.2 |
780 |
*/ |
781 |
@Override |
782 |
public <T> Future<T> submit(Runnable task, T predefinedResult) { |
783 |
Parameters.notNull("task", task); //NOI18N |
784 |
if (stopped) { |
785 |
throw new RejectedExecutionException("Request Processor already " + //NOI18N |
786 |
"stopped"); //NOI18N |
787 |
} |
788 |
RPFutureTask<T> result = new RPFutureTask<T>(task, predefinedResult); |
789 |
Task t = create(result); |
790 |
result.setTask(t); |
791 |
t.schedule(0); |
792 |
return result; |
793 |
} |
794 |
|
795 |
/** |
796 |
* Submit a job to be run, and get back a {@link java.util.concurrent.Future} |
797 |
* which can be waited on and asked for a result, following the contract of |
798 |
* {@link java.util.concurrent.ExecutorService#submit(java.lang.Runnable)} |
799 |
* <p/> |
800 |
* <b>Note:</b> If the passed {@link java.lang.Runnable} implements |
801 |
* {@link org.openide.util.Cancellable}, then that object's {@link org.openide.util.Cancellable#cancel()} |
802 |
* method will be called if {@link java.util.concurrent.Future#cancel(boolean)} is invoked. |
803 |
* If <code>Cancellable.cancel()</code> returns false, then <i>the job will <u>not</u> be |
804 |
* cancelled</i>. |
805 |
* |
806 |
* @param <T> The return type of the passed job |
807 |
* @param task The work to do |
808 |
* @return A Future that can indicate work status and fetch a result |
809 |
* @throws RejectedExecutionException if this RequestProcessor has been |
810 |
* shut down |
811 |
* @since 8.2 |
812 |
*/ |
813 |
@Override |
814 |
public Future<?> submit(Runnable task) { |
815 |
return this.<Void>submit (task, null); |
816 |
} |
817 |
|
818 |
/** |
819 |
* Invoke a collection of tasks, as defined in the specification of |
820 |
* {@link java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)} |
821 |
* @param <T> The type of returned object |
822 |
* @param tasks A collection of Callables with the same return type |
823 |
* @return A list of futures which can be monitored for completion |
824 |
* @throws InterruptedException If the timeout expires |
825 |
* @since 8.2 |
826 |
*/ |
827 |
@Override |
828 |
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { |
829 |
Parameters.notNull("tasks", tasks); //NOI18N |
830 |
List<Future<T>> result = new ArrayList<Future<T>>(tasks.size()); |
831 |
CountDownLatch wait = new CountDownLatch(tasks.size()); |
832 |
for (Callable<T> c : tasks) { |
833 |
if (c == null) { |
834 |
throw new NullPointerException ("Contains null tasks: " + //NOI18N |
835 |
tasks); |
836 |
} |
837 |
Callable<T> delegate = new WaitableCallable<T>(c, wait); |
838 |
result.add (submit(delegate)); |
839 |
} |
840 |
wait.await(); |
841 |
return result; |
842 |
} |
843 |
|
844 |
/** |
845 |
* Executes the given tasks, returning a list of Futures holding their |
846 |
* status and results when all complete or the timeout expires, whichever |
847 |
* happens first, as specified in |
848 |
* {@link java.util.concurrent.ExecutorService#invokeAll((java.util.Collection,long,java.util.concurrent.TimeUnit))} |
849 |
* @param <T> The result type |
850 |
* @param tasks A collection of callables |
851 |
* @param timeout The maximum time to wait for completion, in the specified time units |
852 |
* @param unit The time unit |
853 |
* @return A list of futures |
854 |
* @throws InterruptedException if the timeout expires |
855 |
* @since 8.2 |
856 |
*/ |
857 |
@Override |
858 |
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { |
859 |
Parameters.notNull("unit", unit); //NOI18N |
860 |
Parameters.notNull("tasks", tasks); //NOI18N |
861 |
CountDownLatch wait = new CountDownLatch(tasks.size()); |
862 |
List<Future<T>> result = new ArrayList<Future<T>>(tasks.size()); |
863 |
for (Callable<T> c : tasks) { |
864 |
if (c == null) { |
865 |
throw new NullPointerException ("Contains null tasks: " + tasks); //NOI18N |
866 |
} |
867 |
Callable<T> delegate = new WaitableCallable<T>(c, wait); |
868 |
result.add (submit(delegate)); |
869 |
} |
870 |
if (!wait.await(timeout, unit)) { |
871 |
for (Future<T> f : result) { |
872 |
RPFutureTask<?> ft = (RPFutureTask<?>) f; |
873 |
ft.cancel(true); |
874 |
} |
875 |
} |
876 |
return result; |
877 |
} |
878 |
/** |
879 |
* Executes the given tasks, returning the result of one which has |
880 |
* completed and cancelling any incomplete tasks, as specified in |
881 |
* {@link java.util.concurrent.ExecutorService#invokeAny((java.util.Collection))} |
882 |
* @param <T> The result type |
883 |
* @param tasks A collection of callables |
884 |
* @return A list of futures |
885 |
* @throws InterruptedException if execution is interrupted |
886 |
* @since 8.2 |
887 |
*/ |
888 |
@Override |
889 |
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { |
890 |
Parameters.notNull("tasks", tasks); //NOI18N |
891 |
CountDownLatch wait = new CountDownLatch(1); |
892 |
List<Future<T>> result = new ArrayList<Future<T>>(tasks.size()); |
893 |
AtomicReference<T> ref = new AtomicReference<T>(); |
894 |
try { |
895 |
for (Callable<T> c : tasks) { |
896 |
if (c == null) { |
897 |
throw new NullPointerException ("Contains null tasks: " + //NOI18N |
898 |
tasks); |
899 |
} |
900 |
Callable<T> delegate = new WaitableCallable<T>(c, ref, wait); |
901 |
result.add (submit(delegate)); |
902 |
} |
903 |
wait.await(); |
904 |
} finally { |
905 |
for (Future<T> f : result) { |
906 |
RPFutureTask<?> ft = (RPFutureTask<?>) f; |
907 |
ft.cancel(true); |
908 |
} |
909 |
} |
910 |
return ref.get(); |
911 |
} |
912 |
/** |
913 |
* Executes the given tasks, returning a list of Futures holding their |
914 |
* status and results when all complete or the timeout expires, whichever |
915 |
* happens first, as specified in |
916 |
* {@link java.util.concurrent.ExecutorService#invokeAny((java.util.Collection))} |
917 |
* @param <T> The result type |
918 |
* @param tasks A collection of callables |
919 |
* @param timeout The maximum time to wait for completion, in the specified time units |
920 |
* @param unit The time unit |
921 |
* @return A list of futures |
922 |
* @throws InterruptedException if the timeout expires or execution is interrupted |
923 |
* @since 8.2 |
924 |
*/ |
925 |
@Override |
926 |
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
927 |
Parameters.notNull("unit", unit); //NOI18N |
928 |
Parameters.notNull("tasks", tasks); //NOI18N |
929 |
CountDownLatch wait = new CountDownLatch(1); |
930 |
List<Future<T>> result = new ArrayList<Future<T>>(tasks.size()); |
931 |
AtomicReference<T> ref = new AtomicReference<T>(); |
932 |
try { |
933 |
for (Callable<T> c : tasks) { |
934 |
if (c == null) { |
935 |
throw new NullPointerException ("Contains null tasks: " + //NOI18N |
936 |
tasks); |
937 |
} |
938 |
Callable<T> delegate = new WaitableCallable(c, ref, wait); |
939 |
result.add (submit(delegate)); |
940 |
} |
941 |
wait.await(timeout, unit); |
942 |
} finally { |
943 |
for (Future<T> f : result) { |
944 |
RPFutureTask ft = (RPFutureTask) f; |
945 |
ft.cancel(true); |
946 |
} |
947 |
} |
948 |
return ref.get(); |
949 |
} |
950 |
|
951 |
/** |
952 |
* Schedule a runnable to execute after the specified delay. |
953 |
* @param command The runnable |
954 |
* @param delay The delay |
955 |
* @param unit The time units of the delay |
956 |
* @return A future which can be monitored for completion |
957 |
* @since 8.2 |
958 |
*/ |
959 |
@Override |
960 |
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { |
961 |
Parameters.notNull("command", command); //NOI18N |
962 |
Parameters.notNull("unit", unit); //NOI18N |
963 |
if (delay < 0) { |
964 |
throw new IllegalArgumentException ("Negative delay: " + delay); |
965 |
} |
966 |
if (stopped) { |
967 |
throw new RejectedExecutionException("Request Processor already stopped"); //NOI18N |
968 |
} |
969 |
long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS); |
970 |
if (delayMillis > Integer.MAX_VALUE) { |
971 |
throw new IllegalArgumentException ("Requested delay " + delayMillis + //NOI18N |
972 |
" is > Integer.MAX_VALUE"); //NOI18N |
973 |
} |
974 |
ScheduledRPFutureTask<Void> result = new ScheduledRPFutureTask<Void>(command, null, delayMillis); |
975 |
Task t = create(result); |
976 |
result.setTask(t); |
977 |
t.schedule((int) delayMillis); |
978 |
return result; |
979 |
} |
980 |
/** |
981 |
* Schedule a {@link java.util.concurrent.Callable} to execute on another thread |
982 |
* after the specified delay. |
983 |
* @param command The work to run |
984 |
* @param delay The delay |
985 |
* @param unit The time units of the delay |
986 |
* @return A future which can be monitored for completion |
987 |
* @since 8.2 |
988 |
*/ |
989 |
@Override |
990 |
public <T> ScheduledFuture<T> schedule(Callable<T> callable, long delay, TimeUnit unit) { |
991 |
Parameters.notNull("unit", unit); //NOI18N |
992 |
Parameters.notNull("callable", callable); //NOI18N |
993 |
if (delay < 0) { |
994 |
throw new IllegalArgumentException ("Negative delay: " + delay); |
995 |
} |
996 |
if (stopped) { |
997 |
throw new RejectedExecutionException("Request Processor already " + //NOI18N |
998 |
"stopped"); //NOI18N |
999 |
} |
1000 |
long delayMillis = unit.convert(delay, TimeUnit.MILLISECONDS); |
1001 |
if (delayMillis > Integer.MAX_VALUE) { |
1002 |
throw new IllegalArgumentException ("Requested delay " + //NOI18N |
1003 |
delayMillis + " is > Integer.MAX_VALUE"); //NOI18N |
1004 |
} |
1005 |
ScheduledRPFutureTask<T> result = new ScheduledRPFutureTask<T>(callable, delayMillis); |
1006 |
Task t = create(result); |
1007 |
result.setTask(t); |
1008 |
t.schedule((int) delayMillis); |
1009 |
return result; |
1010 |
} |
1011 |
|
1012 |
/** |
1013 |
* Schedule a runnable which will run with a given frequency, regardless |
1014 |
* of how long execution takes, with the exception that if execution takes |
1015 |
* longer than the specified delay, execution will be delayed but will |
1016 |
* never be run on two threads concurrently. |
1017 |
* As specified in |
1018 |
* {@link java.util.concurrent.ExecutorService.scheduleAtFixedRate(java.lang.Runnable,long,long,java.util.concurrent.TimeUnit)} |
1019 |
* @param command The runnable |
1020 |
* @param initialDelay The delay before first running |
1021 |
* @param period The frequency with which the runnable should run after the |
1022 |
* first run |
1023 |
* @param unit The time units in which the initial delay and period are |
1024 |
* specified |
1025 |
* @return A future which can be monitored for completion, or cancelled |
1026 |
* @since 8.2 |
1027 |
*/ |
1028 |
@Override |
1029 |
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { |
1030 |
return scheduleFixed(command, initialDelay, period, unit, false); |
1031 |
} |
1032 |
|
1033 |
/** |
1034 |
* Schedule a runnable which will run repeatedly after the specified initial |
1035 |
* delay, with the specified delay between the completion of one run and |
1036 |
* the start of the next. |
1037 |
* As specified in |
1038 |
* {@link java.util.concurrent.ExecutorService.scheduleWithFixedDelay(java.lang.Runnable,long,long,java.util.concurrent.TimeUnit)} |
1039 |
* @param command The runnable |
1040 |
* @param initialDelay The delay before first run |
1041 |
* @param delay The delay between runs |
1042 |
* @param unit The time units of the delay parameters |
1043 |
* @return A future which can be monitored for completion, or cancelled |
1044 |
* @since 8.2 |
1045 |
*/ |
1046 |
@Override |
1047 |
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { |
1048 |
return scheduleFixed(command, initialDelay, delay, unit, true); |
1049 |
} |
1050 |
|
1051 |
private ScheduledFuture<?> scheduleFixed (Runnable command, long initialDelay, long period, TimeUnit unit, boolean fixedDelay) { |
1052 |
Parameters.notNull("unit", unit); //NOI18N |
1053 |
Parameters.notNull("command", command); //NOI18N |
1054 |
if (period < 0) { |
1055 |
throw new IllegalArgumentException ("Negative delay: " + period); //NOI18N |
1056 |
} |
1057 |
if (initialDelay < 0) { |
1058 |
throw new IllegalArgumentException ("Negative initialDelay: " //NOI18N |
1059 |
+ initialDelay); |
1060 |
} |
1061 |
if (stopped) { |
1062 |
throw new RejectedExecutionException("Request Processor already " + //NOI18N |
1063 |
"stopped"); //NOI18N |
1064 |
} |
1065 |
long initialDelayMillis = unit.convert(initialDelay, TimeUnit.MILLISECONDS); |
1066 |
if (initialDelayMillis > Integer.MAX_VALUE) { |
1067 |
throw new IllegalArgumentException("Initial delay > " + //NOI18N |
1068 |
"Integer.MAX_VALUE milliseconds: " + initialDelayMillis); //NOI18N |
1069 |
} |
1070 |
long periodMillis = unit.convert(period, TimeUnit.MILLISECONDS); |
1071 |
if (periodMillis > Integer.MAX_VALUE) { |
1072 |
throw new IllegalArgumentException("Initial delay > " + //NOI18N |
1073 |
"Integer.MAX_VALUE milliseconds: " + periodMillis); //NOI18N |
1074 |
} |
1075 |
|
1076 |
TaskFutureWrapper wrap = fixedDelay ? |
1077 |
new FixedDelayTask(command, initialDelayMillis, periodMillis) : |
1078 |
new FixedRateTask(command, initialDelay, periodMillis); |
1079 |
Task t = create(wrap); |
1080 |
wrap.t = t; |
1081 |
t.cancelled = wrap.cancelled; |
1082 |
t.schedule ((int) initialDelayMillis); |
1083 |
|
1084 |
return wrap; |
1085 |
} |
1086 |
|
1087 |
private static abstract class TaskFutureWrapper implements ScheduledFuture<Void>, Runnable, RunnableWrapper { |
1088 |
volatile Task t; |
1089 |
protected final Runnable toRun; |
1090 |
protected final long initialDelay; |
1091 |
protected final long period; |
1092 |
final AtomicBoolean cancelled = new AtomicBoolean(); |
1093 |
TaskFutureWrapper(Runnable run, long initialDelay, long period) { |
1094 |
this.toRun = run; |
1095 |
this.initialDelay = initialDelay; |
1096 |
this.period = period; |
1097 |
} |
1098 |
|
1099 |
@Override |
1100 |
public final Runnable getRunnable() { |
1101 |
return toRun; |
1102 |
} |
1103 |
|
1104 |
@Override |
1105 |
public int compareTo(Delayed o) { |
1106 |
long other = o.getDelay(TimeUnit.MILLISECONDS); |
1107 |
long ours = getDelay(TimeUnit.MILLISECONDS); |
1108 |
//Might overflow on, say, ms compared to Long.MAX_VALUE, TimeUnit.DAYS |
1109 |
return (int) (ours - other); |
1110 |
} |
1111 |
|
1112 |
@Override |
1113 |
public boolean cancel(boolean mayInterruptIfRunning) { |
1114 |
boolean result = true; |
1115 |
if (toRun instanceof Cancellable) { |
1116 |
result = ((Cancellable) toRun).cancel(); |
1117 |
} |
1118 |
if (result) { |
1119 |
//will invoke cancelled.set(true) |
1120 |
result = t.cancel(mayInterruptIfRunning); |
1121 |
} |
1122 |
return result; |
1123 |
} |
1124 |
|
1125 |
@Override |
1126 |
public boolean isCancelled() { |
1127 |
return cancelled.get(); |
1128 |
} |
1129 |
|
1130 |
@Override |
1131 |
public boolean isDone() { |
1132 |
return cancelled.get() || t.isFinished(); |
1133 |
} |
1134 |
|
1135 |
@Override |
1136 |
public Void get() throws InterruptedException, ExecutionException { |
1137 |
if (cancelled.get()) { |
1138 |
throw new CancellationException(); |
1139 |
} |
1140 |
t.waitFinished(); |
1141 |
if (cancelled.get()) { |
1142 |
throw new CancellationException(); |
1143 |
} |
1144 |
return null; |
1145 |
} |
1146 |
|
1147 |
@Override |
1148 |
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
1149 |
if (cancelled.get()) { |
1150 |
throw new CancellationException(); |
1151 |
} |
1152 |
long millis = unit.convert(timeout, TimeUnit.MILLISECONDS); |
1153 |
t.waitFinished(millis); |
1154 |
if (cancelled.get()) { |
1155 |
throw new CancellationException(); |
1156 |
} |
1157 |
return null; |
1158 |
} |
1159 |
} |
1160 |
|
1161 |
private static final class FixedRateTask extends TaskFutureWrapper { |
1162 |
private final Object runLock = new Object(); |
1163 |
private final Object timeLock = new Object(); |
1164 |
//must be accessed holding timeLock |
1165 |
private int runCount; |
1166 |
private long nextRunTime; |
1167 |
private long start = Long.MIN_VALUE; |
1168 |
volatile boolean firstRun = true; |
1169 |
FixedRateTask (Runnable run, long initialDelay, long period) { |
1170 |
super (run, initialDelay, period); |
1171 |
} |
1172 |
|
1173 |
@Override |
1174 |
public void run() { |
1175 |
if (firstRun) { |
1176 |
synchronized (timeLock) { |
1177 |
start = System.currentTimeMillis(); |
1178 |
firstRun = false; |
1179 |
} |
1180 |
} |
1181 |
try { |
1182 |
synchronized(runLock) { |
1183 |
toRun.run(); |
1184 |
} |
1185 |
} catch (RuntimeException e) { |
1186 |
cancel(true); |
1187 |
throw e; |
1188 |
} |
1189 |
reschedule(); |
1190 |
} |
1191 |
|
1192 |
private void reschedule() { |
1193 |
//All access to nextRunTime & runCount under lock. |
1194 |
long interval; |
1195 |
synchronized (timeLock) { |
1196 |
nextRunTime = start + (initialDelay + period * runCount); |
1197 |
runCount++; |
1198 |
interval = Math.max(0, nextRunTime - System.currentTimeMillis()); |
1199 |
if (interval > Integer.MAX_VALUE) { |
1200 |
throw new IllegalStateException ("Interval > Integer.MAX_VALUE: " + interval); //NOI18N |
1201 |
} |
1202 |
} |
1203 |
boolean canContinue = !cancelled.get() && !Thread.currentThread().isInterrupted(); |
1204 |
if (canContinue) { |
1205 |
t.schedule((int) interval); |
1206 |
} |
1207 |
} |
1208 |
|
1209 |
@Override |
1210 |
public long getDelay(TimeUnit unit) { |
1211 |
if (isCancelled()) { |
1212 |
return Long.MAX_VALUE; |
1213 |
} |
1214 |
long delay; |
1215 |
synchronized (timeLock) { |
1216 |
delay = Math.min(0, nextRunTime - System.currentTimeMillis()); |
1217 |
} |
1218 |
return unit.convert(delay, TimeUnit.MILLISECONDS); |
1219 |
} |
1220 |
} |
1221 |
|
1222 |
private static final class FixedDelayTask extends TaskFutureWrapper { |
1223 |
private volatile boolean firstRun = true; |
1224 |
private final AtomicLong nextRunTime = new AtomicLong(); |
1225 |
FixedDelayTask(Runnable run, long initialDelay, long period) { |
1226 |
super (run, initialDelay, period); |
1227 |
} |
1228 |
|
1229 |
@Override |
1230 |
public long getDelay(TimeUnit unit) { |
1231 |
long next = nextRunTime.get(); |
1232 |
return unit.convert (next - System.currentTimeMillis(), TimeUnit.MILLISECONDS); |
1233 |
} |
1234 |
|
1235 |
@Override |
1236 |
public void run() { |
1237 |
if (!fini()) { |
1238 |
toRun.run(); |
1239 |
} |
1240 |
if (!fini()) { |
1241 |
reschedule(); |
1242 |
} |
1243 |
} |
1244 |
|
1245 |
private boolean fini() { |
1246 |
boolean result = cancelled.get() || Thread.currentThread().isInterrupted(); |
1247 |
return result; |
1248 |
} |
1249 |
|
1250 |
private void reschedule() { |
1251 |
long delay; |
1252 |
if (firstRun) { |
1253 |
delay = initialDelay; |
1254 |
} else { |
1255 |
delay = period; |
1256 |
} |
1257 |
nextRunTime.set(System.currentTimeMillis() + delay); |
1258 |
firstRun = false; |
1259 |
if (!fini()) { |
1260 |
t.schedule((int) delay); |
1261 |
} |
1262 |
} |
1263 |
} |
1264 |
|
1265 |
private interface RunnableWrapper { |
1266 |
Runnable getRunnable(); |
1267 |
} |
1268 |
|
1269 |
private static final class WaitableCallable<T> implements Callable<T>, Cancellable { |
1270 |
private final CountDownLatch countdown; |
1271 |
private final Callable<T> delegate; |
1272 |
private final AtomicReference<T> ref; |
1273 |
private volatile boolean failed; |
1274 |
WaitableCallable(Callable<T> delegate, CountDownLatch countdown) { |
1275 |
this (delegate, null, countdown); |
1276 |
} |
1277 |
|
1278 |
WaitableCallable(Callable<T> delegate, AtomicReference<T> ref, CountDownLatch countdown) { |
1279 |
this.delegate = delegate; |
1280 |
this.countdown = countdown; |
1281 |
this.ref = ref; |
1282 |
} |
1283 |
|
1284 |
boolean failed() { |
1285 |
return failed; |
1286 |
} |
1287 |
|
1288 |
@Override |
1289 |
public T call() throws Exception { |
1290 |
try { |
1291 |
T result = delegate.call(); |
1292 |
if (ref != null) { |
1293 |
ref.set(result); |
1294 |
} |
1295 |
return result; |
1296 |
} catch (RuntimeException e) { |
1297 |
failed = true; |
1298 |
throw e; |
1299 |
} catch (Error e) { |
1300 |
failed = true; |
1301 |
throw e; |
1302 |
} finally { |
1303 |
if (!failed || ref == null) { |
1304 |
countdown.countDown(); |
1305 |
} |
1306 |
} |
1307 |
} |
1308 |
|
1309 |
@Override |
1310 |
public boolean cancel() { |
1311 |
return delegate instanceof Cancellable ? ((Cancellable) delegate).cancel() : true; |
1312 |
} |
1313 |
} |
1314 |
|
1315 |
private static class RPFutureTask<T> extends FutureTask<T> implements RunnableWrapper { |
1316 |
protected volatile Task task; |
1317 |
private final Runnable runnable; |
1318 |
private final Cancellable cancellable; |
1319 |
RPFutureTask(Callable<T> c) { |
1320 |
super (c); |
1321 |
this.runnable = null; |
1322 |
this.cancellable = c instanceof Cancellable ? (Cancellable) c : null; |
1323 |
} |
1324 |
|
1325 |
RPFutureTask(Runnable r, T result) { |
1326 |
super (r, result); |
1327 |
this.runnable = r; |
1328 |
this.cancellable = r instanceof Cancellable ? (Cancellable) r : null; |
1329 |
} |
1330 |
|
1331 |
void setTask(Task task) { |
1332 |
this.task = task; |
1333 |
} |
1334 |
|
1335 |
RPFutureTask(Callable<T> c, T predefinedResult) { |
1336 |
this (c); |
1337 |
set(predefinedResult); |
1338 |
} |
1339 |
|
1340 |
@Override |
1341 |
public boolean cancel(boolean mayInterruptIfRunning) { |
1342 |
if (cancellable != null) { |
1343 |
boolean result = cancellable.cancel(); |
1344 |
if (result) { |
1345 |
//note & not && - task.cancel() and super.cancel() must be invoked, |
1346 |
//if and only if the underlying cancellable is really null or |
1347 |
//returned true from cancel(). |
1348 |
return task.cancel() & super.cancel(mayInterruptIfRunning); |
1349 |
} else { |
1350 |
return result; |
1351 |
} |
1352 |
} else { |
1353 |
return task.cancel() & super.cancel(mayInterruptIfRunning); |
1354 |
} |
1355 |
} |
1356 |
|
1357 |
@Override |
1358 |
public Runnable getRunnable() { |
1359 |
return this.runnable; |
1360 |
} |
1361 |
} |
1362 |
|
1363 |
private static final class ScheduledRPFutureTask<T> extends RPFutureTask<T> implements ScheduledFuture<T> { |
1364 |
protected final long delayMillis; |
1365 |
ScheduledRPFutureTask(Callable<T> c, long delayMillis) { |
1366 |
super (c); |
1367 |
this.delayMillis = delayMillis; |
1368 |
} |
1369 |
|
1370 |
ScheduledRPFutureTask(Runnable r, T result, long delayMillis) { |
1371 |
super (r, result); |
1372 |
this.delayMillis = delayMillis; |
1373 |
} |
1374 |
|
1375 |
|
1376 |
@Override |
1377 |
public long getDelay(TimeUnit unit) { |
1378 |
return TimeUnit.MILLISECONDS.convert(delayMillis, unit); |
1379 |
} |
1380 |
|
1381 |
@Override |
1382 |
public int compareTo(Delayed o) { |
1383 |
//Can overflow, if one delay is, say, days, and the other, microseconds |
1384 |
long otherDelayMillis = o.getDelay(TimeUnit.MILLISECONDS); |
1385 |
return (int) (delayMillis - otherDelayMillis); |
1386 |
} |
1387 |
} |
1388 |
|
588 |
private class EnqueueTask extends TimerTask { |
1389 |
private class EnqueueTask extends TimerTask { |
589 |
Item itm; |
1390 |
Item itm; |
590 |
|
1391 |
|
Lines 610-615
Link Here
|
610 |
private int priority = Thread.MIN_PRIORITY; |
1411 |
private int priority = Thread.MIN_PRIORITY; |
611 |
private long time = 0; |
1412 |
private long time = 0; |
612 |
private Thread lastThread = null; |
1413 |
private Thread lastThread = null; |
|
|
1414 |
private AtomicBoolean cancelled; |
613 |
|
1415 |
|
614 |
/** @param run runnable to start |
1416 |
/** @param run runnable to start |
615 |
* @param delay amount of millis to wait |
1417 |
* @param delay amount of millis to wait |
Lines 685-690
Link Here
|
685 |
final Item localItem; |
1487 |
final Item localItem; |
686 |
|
1488 |
|
687 |
synchronized (processorLock) { |
1489 |
synchronized (processorLock) { |
|
|
1490 |
if (cancelled != null) { |
1491 |
cancelled.set(false); |
1492 |
} |
688 |
notifyRunning(); |
1493 |
notifyRunning(); |
689 |
|
1494 |
|
690 |
if (item != null) { |
1495 |
if (item != null) { |
Lines 747-752
Link Here
|
747 |
} |
1552 |
} |
748 |
} |
1553 |
} |
749 |
|
1554 |
|
|
|
1555 |
/** |
1556 |
* Implementation of cancel for use with Future objects, to guarantee |
1557 |
* that the thread will be interrupted, no matter what the setting |
1558 |
* on the owning RP. |
1559 |
* @param interrupt If true, the thread should be interrupted |
1560 |
* @return true if cancellation occurred |
1561 |
*/ |
1562 |
boolean cancel (boolean interrupt) { |
1563 |
synchronized (processorLock) { |
1564 |
if (cancelled != null) { |
1565 |
boolean wasCancelled = !cancelled.getAndSet(true); |
1566 |
if (wasCancelled) { |
1567 |
return false; |
1568 |
} |
1569 |
} |
1570 |
boolean success; |
1571 |
|
1572 |
if (item == null) { |
1573 |
success = false; |
1574 |
} else { |
1575 |
Processor p = item.getProcessor(); |
1576 |
success = item.clear(null); |
1577 |
|
1578 |
if (p != null) { |
1579 |
if (interrupt) { |
1580 |
success = p.interrupt(this, RequestProcessor.this); |
1581 |
} else { |
1582 |
//despite its name, will not actually interrupt |
1583 |
//unless the RP specifies that it should |
1584 |
p.interruptTask(this, RequestProcessor.this); |
1585 |
} |
1586 |
if (success) { |
1587 |
item = null; |
1588 |
} |
1589 |
} |
1590 |
} |
1591 |
if (success) { |
1592 |
notifyFinished(); // mark it as finished |
1593 |
} |
1594 |
return success; |
1595 |
} |
1596 |
} |
1597 |
|
750 |
/** Current priority of the task. |
1598 |
/** Current priority of the task. |
751 |
* @return the priority level (see e.g. {@link Thread#NORM_PRIORITY} |
1599 |
* @return the priority level (see e.g. {@link Thread#NORM_PRIORITY} |
752 |
*/ |
1600 |
*/ |
Lines 1062-1067
Link Here
|
1062 |
} |
1910 |
} |
1063 |
} |
1911 |
} |
1064 |
|
1912 |
|
|
|
1913 |
boolean belongsTo(RequestProcessor r) { |
1914 |
synchronized (lock) { |
1915 |
return source == r; |
1916 |
} |
1917 |
} |
1918 |
|
1065 |
/** |
1919 |
/** |
1066 |
* The method that will repeatedly wait for a request and perform it. |
1920 |
* The method that will repeatedly wait for a request and perform it. |
1067 |
*/ |
1921 |
*/ |
Lines 1190-1195
Link Here
|
1190 |
} |
2044 |
} |
1191 |
} |
2045 |
} |
1192 |
|
2046 |
|
|
|
2047 |
boolean interrupt (Task t, RequestProcessor src) { |
2048 |
if (t != todo) { |
2049 |
return false; |
2050 |
} |
2051 |
interrupt(); |
2052 |
return true; |
2053 |
} |
2054 |
|
1193 |
/** @see "#20467" */ |
2055 |
/** @see "#20467" */ |
1194 |
private static void doNotify(RequestProcessor.Task todo, Throwable ex) { |
2056 |
private static void doNotify(RequestProcessor.Task todo, Throwable ex) { |
1195 |
if (SLOW && todo.item.message == null) { |
2057 |
if (SLOW && todo.item.message == null) { |