Skip to content

Commit 373e389

Browse files
Updated AsyncWorker (and defined AsyncProcessor) so that we get back … (#212)
* Updated AsyncWorker (and defined AsyncProcessor) so that we get back Futures for processed tasks. Those futures can then be used to block on and wait if desired. Exceptions during async processing are now also caught and logged, rather than causing the thread to die and leaving us in a bad state. * Changed AsyncProcessor to use a single threaded executor - allowing for a graceful shutdown. * Added logging for async processor start up as well. * Split awaitTermination and close + removed running flag Co-authored-by: Gianlu <[email protected]>
1 parent 766d165 commit 373e389

6 files changed

Lines changed: 139 additions & 36 deletions

File tree

api/src/main/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ log4j.rootLogger=TRACE, stdout
22
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
33
log4j.appender.stdout.Target=System.out
44
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
5-
log4j.appender.stdout.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
5+
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n
66
# Mute Undertow and its dependencies
77
log4j.logger.io.undertow=WARN
88
log4j.logger.org.jboss=WARN
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package xyz.gianlu.librespot.common;
2+
3+
import org.apache.log4j.Logger;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
import java.io.Closeable;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.Future;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.function.Function;
12+
13+
/**
14+
* Simple worker thread that processes tasks sequentially
15+
*
16+
* @param <REQ> The type of task/input that AsyncProcessor handles.
17+
* @param <RES> Return type of our processor implementation
18+
*/
19+
public class AsyncProcessor<REQ, RES> implements Closeable {
20+
private static final Logger LOGGER = Logger.getLogger(AsyncProcessor.class);
21+
private final String name;
22+
private final Function<REQ, RES> processor;
23+
private final ExecutorService executor;
24+
25+
/**
26+
* @param name name of async processor - used for thread name and logging
27+
* @param processor actual processing implementation ran on background thread
28+
*/
29+
public AsyncProcessor(@NotNull String name, @NotNull Function<REQ, RES> processor) {
30+
executor = Executors.newSingleThreadExecutor(new NameThreadFactory(r -> name));
31+
this.name = name;
32+
this.processor = processor;
33+
LOGGER.trace(String.format("AsyncProcessor{%s} has started", name));
34+
}
35+
36+
public Future<RES> submit(@NotNull REQ task) {
37+
return executor.submit(() -> processor.apply(task));
38+
}
39+
40+
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
41+
if (!executor.isShutdown())
42+
throw new IllegalStateException(String.format("AsyncProcessor{%s} hasn't been shut down yet", name));
43+
44+
if (executor.awaitTermination(timeout, unit)) {
45+
LOGGER.trace(String.format("AsyncProcessor{%s} is shut down", name));
46+
return true;
47+
} else {
48+
return false;
49+
}
50+
}
51+
52+
@Override
53+
public void close() {
54+
LOGGER.trace(String.format("AsyncProcessor{%s} is shutting down", name));
55+
executor.shutdown();
56+
}
57+
}
Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,38 @@
11
package xyz.gianlu.librespot.common;
22

3-
import org.apache.log4j.Logger;
43
import org.jetbrains.annotations.NotNull;
54

65
import java.io.Closeable;
7-
import java.util.concurrent.BlockingQueue;
8-
import java.util.concurrent.LinkedBlockingQueue;
6+
import java.util.concurrent.Future;
7+
import java.util.concurrent.TimeUnit;
98
import java.util.function.Consumer;
109

11-
public class AsyncWorker<T> implements Closeable, Runnable {
12-
private static final Logger LOGGER = Logger.getLogger(AsyncWorker.class);
13-
private final Thread thread;
14-
private final BlockingQueue<T> internalQueue;
15-
private final String name;
16-
private final Consumer<T> consumer;
17-
private volatile boolean running = true;
10+
/**
11+
* Wrapper around AsyncProcessor that deals with void methods and does not expect a response type
12+
*
13+
* @param <T> Task type for processor
14+
*/
15+
public class AsyncWorker<T> implements Closeable {
16+
private final AsyncProcessor<T, Void> underlyingProcessor;
1817

1918
public AsyncWorker(@NotNull String name, @NotNull Consumer<T> consumer) {
20-
this.name = name;
21-
this.consumer = consumer;
22-
23-
internalQueue = new LinkedBlockingQueue<>();
24-
thread = new Thread(this, name);
25-
thread.start();
19+
this.underlyingProcessor = new AsyncProcessor<>(name, t -> {
20+
consumer.accept(t);
21+
return null;
22+
});
2623
}
2724

28-
public void submit(@NotNull T task) {
29-
internalQueue.add(task);
25+
@NotNull
26+
public Future<Void> submit(@NotNull T task) {
27+
return underlyingProcessor.submit(task);
3028
}
3129

32-
@Override
33-
public void close() {
34-
running = false;
35-
thread.interrupt();
30+
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
31+
return underlyingProcessor.awaitTermination(timeout, unit);
3632
}
3733

3834
@Override
39-
public void run() {
40-
LOGGER.trace(String.format("AsyncWorker{%s} is starting", name));
41-
42-
while (running) {
43-
try {
44-
T polled = internalQueue.take();
45-
consumer.accept(polled);
46-
} catch (InterruptedException ignored) {
47-
}
48-
}
49-
50-
LOGGER.trace(String.format("AsyncWorker{%s} is shutting down", name));
35+
public void close() {
36+
underlyingProcessor.close();
5137
}
5238
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
log4j.rootLogger=TRACE, stdout
2+
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
3+
log4j.appender.stdout.Target=System.out
4+
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
5+
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package xyz.gianlu.librespot.common;
2+
3+
import org.junit.jupiter.api.Assertions;
4+
import org.junit.jupiter.api.Test;
5+
6+
import java.util.concurrent.ExecutionException;
7+
import java.util.concurrent.Future;
8+
import java.util.concurrent.RejectedExecutionException;
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
11+
public class AsyncProcessorTest {
12+
13+
@Test
14+
void testAsyncProcessor() throws ExecutionException, InterruptedException {
15+
AtomicInteger internalState = new AtomicInteger();
16+
AsyncProcessor<Integer, Integer> asyncProcessor = new AsyncProcessor<>("test-processor-1", internalState::addAndGet);
17+
asyncProcessor.submit(1);
18+
asyncProcessor.submit(2);
19+
asyncProcessor.submit(3);
20+
Future<Integer> lastTask = asyncProcessor.submit(4);
21+
22+
Integer lastResult = lastTask.get(); // we only need to wait for the last one as tasks are executed in order
23+
Assertions.assertEquals(10, internalState.get());
24+
Assertions.assertEquals(10, lastResult);
25+
asyncProcessor.close();
26+
}
27+
28+
@Test
29+
void testAsyncProcessorExceptionHandling() {
30+
AsyncProcessor<Integer, Integer> asyncProcessor = new AsyncProcessor<>("test-processor-2", i -> {
31+
throw new IllegalStateException();
32+
});
33+
34+
Future<Integer> firstTask = asyncProcessor.submit(1);
35+
Assertions.assertThrows(ExecutionException.class, firstTask::get);
36+
37+
// now we check our loop didn't break and we are able to submit more tasks to our queue
38+
Future<Integer> secondTask = asyncProcessor.submit(1);
39+
Assertions.assertThrows(ExecutionException.class, secondTask::get);
40+
asyncProcessor.close();
41+
}
42+
43+
@Test
44+
void testAsyncProcessorFailAfterShutdown() throws ExecutionException, InterruptedException {
45+
AtomicInteger internalState = new AtomicInteger();
46+
AsyncProcessor<Integer, Integer> asyncProcessor = new AsyncProcessor<>("test-processor-3", internalState::addAndGet);
47+
48+
Future<Integer> taskBeforeShutdown = asyncProcessor.submit(1);
49+
Assertions.assertEquals(1, taskBeforeShutdown.get());
50+
51+
asyncProcessor.close();
52+
53+
Assertions.assertThrows(RejectedExecutionException.class, () -> asyncProcessor.submit(1));
54+
}
55+
}

core/src/main/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ log4j.rootLogger=TRACE, stdout
22
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
33
log4j.appender.stdout.Target=System.out
44
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
5-
log4j.appender.stdout.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
5+
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n

0 commit comments

Comments
 (0)