Lines 64-69
Link Here
|
64 |
* <CODE>RequestProcessor</CODE> instance with limited throughput (probably |
64 |
* <CODE>RequestProcessor</CODE> instance with limited throughput (probably |
65 |
* set to 1), the IDE would try to run all your requests in parallel otherwise. |
65 |
* set to 1), the IDE would try to run all your requests in parallel otherwise. |
66 |
* |
66 |
* |
|
|
67 |
* <P> |
68 |
* Since version 6.3 there is a conditional support for interruption of long running tasks. |
69 |
* There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel } |
70 |
* but if the task was already running, one was out of luck. Since version 6.3 |
71 |
* the thread running the task is interrupted and the Runnable can check for that |
72 |
* and terminate its execution sooner. In the runnable one shall check for |
73 |
* thread interruption (done from {@link RequestProcessor.Task#cancel }) and |
74 |
* if true, return immediatelly as in this example: |
75 |
* <PRE> |
76 |
* public void run () { |
77 |
* while (veryLongTimeLook) { |
78 |
* doAPieceOfIt (); |
79 |
* |
80 |
* if (Thread.interrupted ()) return; |
81 |
* } |
82 |
* } |
83 |
* </PRE> |
84 |
* |
67 |
* @author Petr Nejedly, Jaroslav Tulach |
85 |
* @author Petr Nejedly, Jaroslav Tulach |
68 |
*/ |
86 |
*/ |
69 |
public final class RequestProcessor { |
87 |
public final class RequestProcessor { |
Lines 112-117
Link Here
|
112 |
/** The maximal number of processors that can perform the requests sent |
130 |
/** The maximal number of processors that can perform the requests sent |
113 |
* to this RequestProcessors. If 1, all the requests are serialized. */ |
131 |
* to this RequestProcessors. If 1, all the requests are serialized. */ |
114 |
private int throughput; |
132 |
private int throughput; |
|
|
133 |
|
134 |
/** support for interrupts or not? */ |
135 |
private boolean interruptThread; |
115 |
|
136 |
|
116 |
/** Creates new RequestProcessor with automatically assigned unique name. */ |
137 |
/** Creates new RequestProcessor with automatically assigned unique name. */ |
117 |
public RequestProcessor() { |
138 |
public RequestProcessor() { |
Lines 131-140
Link Here
|
131 |
* @since OpenAPI version 2.12 |
152 |
* @since OpenAPI version 2.12 |
132 |
*/ |
153 |
*/ |
133 |
public RequestProcessor(String name, int throughput) { |
154 |
public RequestProcessor(String name, int throughput) { |
|
|
155 |
this(name, throughput, false); |
156 |
} |
157 |
|
158 |
/** Creates a new named RequestProcessor with defined throughput which |
159 |
* can support interruption of the thread the processor runs in. |
160 |
* There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel } |
161 |
* but if the task was already running, one was out of luck. With this |
162 |
* constructor one can create a {@link RequestProcessor} which threads |
163 |
* thread running tasks are interrupted and the Runnable can check for that |
164 |
* and terminate its execution sooner. In the runnable one shall check for |
165 |
* thread interruption (done from {@link RequestProcessor.Task#cancel }) and |
166 |
* if true, return immediatelly as in this example: |
167 |
* <PRE> |
168 |
* public void run () { |
169 |
* while (veryLongTimeLook) { |
170 |
* doAPieceOfIt (); |
171 |
* |
172 |
* if (Thread.interrupted ()) return; |
173 |
* } |
174 |
* } |
175 |
* </PRE> |
176 |
* |
177 |
* @param name the name to use for the request processor thread |
178 |
* @param throughput the maximal count of requests allowed to run in parallel |
179 |
* @param interruptThread true if {@link RequestProcessor.Task#cancel} shall interrupt the thread |
180 |
* |
181 |
* @since 6.3 |
182 |
*/ |
183 |
public RequestProcessor(String name, int throughput, boolean interruptThread) { |
134 |
this.throughput = throughput; |
184 |
this.throughput = throughput; |
135 |
this.name = (name != null) ? name : ("OpenIDE-request-processor-" + (counter++)); |
185 |
this.name = (name != null) ? name : ("OpenIDE-request-processor-" + (counter++)); |
|
|
186 |
this.interruptThread = interruptThread; |
136 |
} |
187 |
} |
137 |
|
188 |
|
|
|
189 |
|
138 |
/** The getter for the shared instance of the <CODE>RequestProcessor</CODE>. |
190 |
/** The getter for the shared instance of the <CODE>RequestProcessor</CODE>. |
139 |
* |
191 |
* |
140 |
* @return an instance of RequestProcessor that is capable of performing |
192 |
* @return an instance of RequestProcessor that is capable of performing |
Lines 364-384
Link Here
|
364 |
} |
416 |
} |
365 |
|
417 |
|
366 |
Task askForWork(Processor worker, String debug) { |
418 |
Task askForWork(Processor worker, String debug) { |
367 |
synchronized (processorLock) { |
419 |
if (stopped || queue.isEmpty()) { // no more work in this burst, return him |
368 |
if (stopped || queue.isEmpty()) { // no more work in this burst, return him |
420 |
processors.remove(worker); |
369 |
processors.remove(worker); |
421 |
Processor.put(worker, debug); |
370 |
Processor.put(worker, debug); |
422 |
running--; |
371 |
running--; |
423 |
|
372 |
|
424 |
return null; |
373 |
return null; |
425 |
} else { // we have some work for the worker, pass it |
374 |
} else { // we have some work for the worker, pass it |
426 |
|
375 |
|
427 |
Item i = (Item) queue.remove(0); |
376 |
Item i = (Item) queue.remove(0); |
428 |
Task t = i.getTask(); |
377 |
Task t = i.getTask(); |
429 |
i.clear(worker); |
378 |
i.clear(); |
|
|
379 |
|
430 |
|
380 |
return t; |
431 |
return t; |
381 |
} |
|
|
382 |
} |
432 |
} |
383 |
} |
433 |
} |
384 |
|
434 |
|
Lines 458-464
Link Here
|
458 |
notifyRunning(); |
508 |
notifyRunning(); |
459 |
|
509 |
|
460 |
if (item != null) { |
510 |
if (item != null) { |
461 |
item.clear(); |
511 |
item.clear(null); |
462 |
} |
512 |
} |
463 |
|
513 |
|
464 |
item = new Item(this, RequestProcessor.this); |
514 |
item = new Item(this, RequestProcessor.this); |
Lines 490-496
Link Here
|
490 |
*/ |
540 |
*/ |
491 |
public boolean cancel() { |
541 |
public boolean cancel() { |
492 |
synchronized (processorLock) { |
542 |
synchronized (processorLock) { |
493 |
boolean success = (item == null) ? false : item.clear(); |
543 |
boolean success; |
|
|
544 |
|
545 |
if (item == null) { |
546 |
success = false; |
547 |
} else { |
548 |
Processor p = item.getProcessor(); |
549 |
success = item.clear(null); |
550 |
|
551 |
if (p != null) { |
552 |
p.interruptTask(this, RequestProcessor.this); |
553 |
item = null; |
554 |
} |
555 |
} |
494 |
|
556 |
|
495 |
if (success) { |
557 |
if (success) { |
496 |
notifyFinished(); // mark it as finished |
558 |
notifyFinished(); // mark it as finished |
Lines 541-557
Link Here
|
541 |
*/ |
603 |
*/ |
542 |
public void waitFinished() { |
604 |
public void waitFinished() { |
543 |
if (isRequestProcessorThread()) { //System.err.println("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName()); |
605 |
if (isRequestProcessorThread()) { //System.err.println("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName()); |
544 |
|
|
|
545 |
boolean toRun; |
606 |
boolean toRun; |
546 |
|
607 |
|
547 |
synchronized (processorLock) { |
608 |
synchronized (processorLock) { |
548 |
// correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ()); |
609 |
// correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ()); |
549 |
// the same: toRun = !isFinished () && (item == null ? true : item.clear ()); |
610 |
// the same: toRun = !isFinished () && (item == null ? true : item.clear ()); |
550 |
toRun = !isFinished() && ((item == null) || item.clear()); |
611 |
toRun = !isFinished() && ((item == null) || item.clear(null)); |
551 |
} |
612 |
} |
552 |
|
613 |
|
553 |
if (toRun) { //System.err.println(" ## running it synchronously"); |
614 |
if (toRun) { //System.err.println(" ## running it synchronously"); |
554 |
run(); |
615 |
Processor processor = (Processor)Thread.currentThread(); |
|
|
616 |
processor.doEvaluate (this, processorLock, RequestProcessor.this); |
555 |
} else { // it is already running in other thread of this RP |
617 |
} else { // it is already running in other thread of this RP |
556 |
|
618 |
|
557 |
if (lastThread != Thread.currentThread()) { |
619 |
if (lastThread != Thread.currentThread()) { |
Lines 586-592
Link Here
|
586 |
boolean toRun; |
648 |
boolean toRun; |
587 |
|
649 |
|
588 |
synchronized (processorLock) { |
650 |
synchronized (processorLock) { |
589 |
toRun = !isFinished() && ((item == null) || item.clear()); |
651 |
toRun = !isFinished() && ((item == null) || item.clear(null)); |
590 |
} |
652 |
} |
591 |
|
653 |
|
592 |
if (toRun) { |
654 |
if (toRun) { |
Lines 614-620
Link Here
|
614 |
/* One item representing the task pending in the pending queue */ |
676 |
/* One item representing the task pending in the pending queue */ |
615 |
private static class Item extends Exception { |
677 |
private static class Item extends Exception { |
616 |
private final RequestProcessor owner; |
678 |
private final RequestProcessor owner; |
617 |
private Task action; |
679 |
private Object action; |
618 |
private boolean enqueued; |
680 |
private boolean enqueued; |
619 |
|
681 |
|
620 |
Item(Task task, RequestProcessor rp) { |
682 |
Item(Task task, RequestProcessor rp) { |
Lines 624-645
Link Here
|
624 |
} |
686 |
} |
625 |
|
687 |
|
626 |
Task getTask() { |
688 |
Task getTask() { |
627 |
return action; |
689 |
Object a = action; |
|
|
690 |
|
691 |
return (a instanceof Task) ? (Task) a : null; |
628 |
} |
692 |
} |
629 |
|
693 |
|
630 |
/** Annulate this request iff still possible. |
694 |
/** Annulate this request iff still possible. |
631 |
* @returns true if it was possible to skip this item, false |
695 |
* @returns true if it was possible to skip this item, false |
632 |
* if the item was/is already processed */ |
696 |
* if the item was/is already processed */ |
633 |
boolean clear() { |
697 |
boolean clear(Processor processor) { |
634 |
synchronized (owner.processorLock) { |
698 |
synchronized (owner.processorLock) { |
635 |
action = null; |
699 |
action = processor; |
636 |
|
700 |
|
637 |
return enqueued ? owner.queue.remove(this) : true; |
701 |
return enqueued ? owner.queue.remove(this) : true; |
638 |
} |
702 |
} |
639 |
} |
703 |
} |
640 |
|
704 |
|
|
|
705 |
Processor getProcessor() { |
706 |
Object a = action; |
707 |
|
708 |
return (a instanceof Processor) ? (Processor) a : null; |
709 |
} |
710 |
|
641 |
int getPriority() { |
711 |
int getPriority() { |
642 |
return action.getPriority(); |
712 |
return getTask().getPriority(); |
643 |
} |
713 |
} |
644 |
|
714 |
|
645 |
public Throwable fillInStackTrace() { |
715 |
public Throwable fillInStackTrace() { |
Lines 669-674
Link Here
|
669 |
|
739 |
|
670 |
//private Item task; |
740 |
//private Item task; |
671 |
private RequestProcessor source; |
741 |
private RequestProcessor source; |
|
|
742 |
|
743 |
/** task we are working on */ |
744 |
private RequestProcessor.Task todo; |
672 |
private boolean idle = true; |
745 |
private boolean idle = true; |
673 |
|
746 |
|
674 |
/** Waiting lock */ |
747 |
/** Waiting lock */ |
Lines 771-777
Link Here
|
771 |
} |
844 |
} |
772 |
} |
845 |
} |
773 |
|
846 |
|
774 |
Task todo; |
|
|
775 |
String debug = null; |
847 |
String debug = null; |
776 |
|
848 |
|
777 |
ErrorManager em = logger(); |
849 |
ErrorManager em = logger(); |
Lines 782-789
Link Here
|
782 |
} |
854 |
} |
783 |
|
855 |
|
784 |
// while we have something to do |
856 |
// while we have something to do |
785 |
while ((todo = current.askForWork(this, debug)) != null) { |
857 |
for (;;) { |
786 |
// if(todo != null) { |
858 |
// need the same sync as interruptTask |
|
|
859 |
synchronized (current.processorLock) { |
860 |
todo = current.askForWork(this, debug); |
861 |
if (todo == null) break; |
862 |
} |
787 |
setPrio(todo.getPriority()); |
863 |
setPrio(todo.getPriority()); |
788 |
|
864 |
|
789 |
try { |
865 |
try { |
Lines 813-827
Link Here
|
813 |
doNotify(todo, t); |
889 |
doNotify(todo, t); |
814 |
} |
890 |
} |
815 |
|
891 |
|
816 |
// to improve GC |
892 |
// need the same sync as interruptTask |
817 |
todo = null; |
893 |
synchronized (current.processorLock) { |
818 |
|
894 |
// to improve GC |
819 |
// } |
895 |
todo = null; |
|
|
896 |
// and to clear any possible interrupted state |
897 |
// set by calling Task.cancel () |
898 |
Thread.interrupted(); |
899 |
} |
820 |
} |
900 |
} |
821 |
|
901 |
|
822 |
if (loggable) { |
902 |
if (loggable) { |
823 |
logger().log(ErrorManager.INFORMATIONAL, "Work finished " + getName()); // NOI18N |
903 |
logger().log(ErrorManager.INFORMATIONAL, "Work finished " + getName()); // NOI18N |
824 |
} |
904 |
} |
|
|
905 |
} |
906 |
} |
907 |
|
908 |
/** Evaluates given task directly. |
909 |
*/ |
910 |
final void doEvaluate (Task t, Object processorLock, RequestProcessor src) { |
911 |
Task previous = todo; |
912 |
boolean interrupted = Thread.interrupted(); |
913 |
try { |
914 |
todo = t; |
915 |
t.run (); |
916 |
} finally { |
917 |
synchronized (processorLock) { |
918 |
todo = previous; |
919 |
if (interrupted || todo.item == null) { |
920 |
if (src.interruptThread) { |
921 |
// reinterrupt the thread if it was interrupted and |
922 |
// we support interrupts |
923 |
Thread.currentThread().interrupt(); |
924 |
} |
925 |
} |
926 |
} |
927 |
} |
928 |
} |
929 |
|
930 |
/** Called under the processorLock */ |
931 |
public void interruptTask(Task t, RequestProcessor src) { |
932 |
if (t != todo) { |
933 |
// not running this task so |
934 |
return; |
935 |
} |
936 |
|
937 |
if (src.interruptThread) { |
938 |
// otherwise interrupt this thread |
939 |
interrupt(); |
825 |
} |
940 |
} |
826 |
} |
941 |
} |
827 |
|
942 |
|