Skip to content

Commit 6d0a973

Browse files
AlexanderSchultheisspmbittner
authored andcommitted
fix: remove bottleneck caused by waiting for the result of the next task
# Description ScheduledTasksIterator contained a bottleneck in its scheduling that hat a severe effect on the runtime performance, if some tasks take considerably longer than others. Before the fix, the iterator would only schedule at most `nThreads` tasks in its internal thread pool. Upon consumption of the iterator, a new task was only scheduled, if the next result became available (i.e., a call to `next()` returned a result). This creates a bottleneck, if the iterator has to block upon a `next()` call for a long task. This potentially leads to a high number of idle threads. With the fix, all tasks are scheduled upon creation of the iterator and their execution is managed by the thread pool with an internal queue. This way, all threads are busy until all scheduled tasks are done. The order of results is still managed by the list of futures, which guarantees that order of results is the same as the order of the given tasks. # Example Consider an iterator using a thread pool with 3 threads and 6 tasks to complete. Upon instantiation, the iterator would schedule the firsts 3 tasks for execution. This results in the following state for the tasks: ``` - task1: ONGOING - task2: ONGOING - task3: ONGOING - task4: NOT SCHEDULED - task5: NOT SCHEDULED - task6: NOT SCHEDULED ``` and for the threads: ``` - thread1: PROCESSING task1 - thread2: PROCESSING task2 - thread3: PROCESSING task3 ``` Now, assume that the iterator is being consumed and a bit of time passed. `task2` and `task3` are done; `task1` is ongoing. The iterator blocks upon the first `next()` call until the result is ready. No new tasks are scheduled. The state would now look as follows: ``` - task1: ONGOING - task2: DONE - task3: DONE - task4: NOT SCHEDULED - task5: NOT SCHEDULED - task6: NOT SCHEDULED ``` and for the threads: ``` - thread1: PROCESSING task1 - thread2: IDLE - thread3: IDLE ``` With the fix, all tasks are scheduled immediately and sequentially submitted to free threads by the thread pool. The initial state now looks like this: ``` - task1: ONGOING - task2: ONGOING - task3: ONGOING - task4: SCHEDULED - task5: SCHEDULED - task6: SCHEDULED ``` and for the threads: ``` - thread1: PROCESSING task1 - thread2: PROCESSING task2 - thread3: PROCESSING task3 ``` And the second state with the same conditions as above looks like this: ``` - task1: ONGOING - task2: DONE - task3: DONE - task4: ONGOING - task5: ONGOING - task6: NOT SCHEDULED ``` and for the threads: ``` - thread1: PROCESSING task1 - thread2: PROCESSING task4 - thread3: PROCESSING task5 ```
1 parent cec1747 commit 6d0a973

1 file changed

Lines changed: 6 additions & 16 deletions

File tree

src/main/java/org/variantsync/diffdetective/parallel/ScheduledTasksIterator.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
* The results of the given tasks can be received in the correct order using {@link next}. That
1010
* means the {@link next} method is deterministic if all tasks are deterministic.
1111
*
12-
* <p>No extra thread for scheduling is used so new tasks are only scheduled if {@link next} is
13-
* called.
12+
* <p>Upon creation of a ScheduledTasksIterator, all given tasks are scheduled for execution. However, the results of these
13+
* tasks become available in the same order as the tasks have been provided. This means, that the iterator will block
14+
* until the result of the next task is available, even if all other remaining tasks are complete.
1415
*/
1516
public class ScheduledTasksIterator<T> implements Iterator<T>, AutoCloseable {
1617
private final Iterator<? extends Callable<T>> remainingTasks;
@@ -20,7 +21,7 @@ public class ScheduledTasksIterator<T> implements Iterator<T>, AutoCloseable {
2021
/**
2122
* Starts scheduling {@code tasks} in {@code nThreads} other threads.
2223
* The results of these tasks can be retrieved by calling {@link next}. The order of these
23-
* results not deterministic and can't be assumed to be the same as in {@code tasks}.
24+
* results is deterministic and is the same as the order of the provided {@code tasks}.
2425
*
2526
* @param tasks the tasks which will be executed in other threads
2627
* @param nThreads the number of threads which work on {@code tasks} in parallel
@@ -29,8 +30,8 @@ public ScheduledTasksIterator(final Iterator<? extends Callable<T>> tasks, final
2930
this.remainingTasks = tasks;
3031
this.futures = new LinkedList<>();
3132
this.threadPool = Executors.newFixedThreadPool(nThreads);
32-
for (int i = 0; i < nThreads; i++) {
33-
scheduleNext();
33+
while (this.remainingTasks.hasNext()) {
34+
futures.add(threadPool.submit(remainingTasks.next()));
3435
}
3536
}
3637

@@ -44,16 +45,6 @@ public ScheduledTasksIterator(final Iterable<? extends Callable<T>> tasks, final
4445
this(tasks.iterator(), nThreads);
4546
}
4647

47-
/**
48-
* Schedule the next task on the thread pool and adds the result future to the {@code futures}
49-
* queue.
50-
*/
51-
private synchronized void scheduleNext() {
52-
if (this.remainingTasks.hasNext()) {
53-
futures.add(threadPool.submit(remainingTasks.next()));
54-
}
55-
}
56-
5748
@Override
5849
public boolean hasNext() {
5950
return !futures.isEmpty();
@@ -70,7 +61,6 @@ public boolean hasNext() {
7061
*/
7162
@Override
7263
public T next() {
73-
scheduleNext();
7464
try {
7565
return futures.removeFirst().get();
7666
} catch (final InterruptedException | ExecutionException e) {

0 commit comments

Comments
 (0)