Skip to content

Commit b33485d

Browse files
Extracted AsyncWorker we can use for multiple consumers in code base. (#206)
* Extracted AsyncWorker we can use for multiple consumers in code base. * Changed another consumer to use AsyncWorker pattern. * Reformatted code + annotations + do not catch RuntimeExceptions (to match old behaviour) * Using AsyncWorker in PacketsManager and DealerClient Co-authored-by: Gianlu <[email protected]>
1 parent e652371 commit b33485d

8 files changed

Lines changed: 206 additions & 225 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package xyz.gianlu.librespot.common;
2+
3+
import org.apache.log4j.Logger;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
import java.io.Closeable;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.LinkedBlockingQueue;
9+
import java.util.function.Consumer;
10+
11+
public class AsyncWorker<T> implements Closeable, Runnable {
12+
private static final Logger LOGGER = Logger.getLogger(AsyncWorker.class);
13+
private final Thread thread;
14+
private final BlockingQueue<T> internalQueue;
15+
private final String name;
16+
private final Consumer<T> consumer;
17+
private volatile boolean running = true;
18+
19+
public AsyncWorker(@NotNull String name, @NotNull Consumer<T> consumer) {
20+
this.name = name;
21+
this.consumer = consumer;
22+
23+
internalQueue = new LinkedBlockingQueue<>();
24+
thread = new Thread(this, name);
25+
thread.start();
26+
}
27+
28+
public void submit(@NotNull T task) {
29+
internalQueue.add(task);
30+
}
31+
32+
@Override
33+
public void close() {
34+
running = false;
35+
thread.interrupt();
36+
}
37+
38+
@Override
39+
public void run() {
40+
while (running) {
41+
try {
42+
T polled = internalQueue.take();
43+
consumer.accept(polled);
44+
} catch (InterruptedException ignored) {
45+
}
46+
}
47+
48+
LOGGER.trace(String.format("AsyncWorker{%s} is shutting down", name));
49+
}
50+
}

core/src/main/java/xyz/gianlu/librespot/connectstate/DeviceStateHandler.java

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.jetbrains.annotations.NotNull;
1212
import org.jetbrains.annotations.Nullable;
1313
import xyz.gianlu.librespot.Version;
14+
import xyz.gianlu.librespot.common.AsyncWorker;
1415
import xyz.gianlu.librespot.common.ProtoUtils;
1516
import xyz.gianlu.librespot.core.Session;
1617
import xyz.gianlu.librespot.core.TimeProvider;
@@ -23,9 +24,6 @@
2324
import java.io.Closeable;
2425
import java.io.IOException;
2526
import java.util.*;
26-
import java.util.concurrent.BlockingQueue;
27-
import java.util.concurrent.LinkedBlockingQueue;
28-
import java.util.concurrent.TimeUnit;
2927

3028
/**
3129
* @author Gianlu
@@ -45,21 +43,19 @@ public final class DeviceStateHandler implements Closeable, DealerClient.Message
4543
private final Connect.DeviceInfo.Builder deviceInfo;
4644
private final List<Listener> listeners = Collections.synchronizedList(new ArrayList<>());
4745
private final Connect.PutStateRequest.Builder putState;
48-
private final Worker worker;
46+
private final AsyncWorker<Connect.PutStateRequest> putStateWorker;
4947
private volatile String connectionId = null;
5048

5149
public DeviceStateHandler(@NotNull Session session) {
5250
this.session = session;
5351
this.deviceInfo = initializeDeviceInfo(session);
54-
this.worker = new Worker();
52+
this.putStateWorker = new AsyncWorker<>("put-state-worker", this::putConnectState);
5553
this.putState = Connect.PutStateRequest.newBuilder()
5654
.setMemberType(Connect.MemberType.CONNECT_STATE)
5755
.setDevice(Connect.Device.newBuilder()
5856
.setDeviceInfo(deviceInfo)
5957
.build());
6058

61-
new Thread(worker, "put-state-worker").start();
62-
6359
session.dealer().addMessageListener(this, "hm://pusher/v1/connections/", "hm://connect-state/v1/connect/volume", "hm://connect-state/v1/cluster");
6460
session.dealer().addRequestListener(this, "hm://connect-state/v1/");
6561
session.mercury().interestedIn("hm://pusher/v1/connections/", this);
@@ -211,7 +207,7 @@ public synchronized void updateState(@NotNull Connect.PutStateReason reason, @No
211207
.setClientSideTimestamp(TimeProvider.currentTimeMillis())
212208
.getDeviceBuilder().setDeviceInfo(deviceInfo).setPlayerState(state);
213209

214-
worker.submit(putState.build());
210+
putStateWorker.submit(putState.build());
215211
}
216212

217213
public synchronized int getVolume() {
@@ -233,10 +229,25 @@ public void close() {
233229
session.dealer().removeRequestListener(this);
234230
session.mercury().notInterested(this);
235231

236-
worker.stop();
232+
putStateWorker.close();
237233
listeners.clear();
238234
}
239235

236+
/**
237+
* Performs the network request related to {@link Connect.PutStateRequest}. This MUST be called only from {@link DeviceStateHandler#putStateWorker}.
238+
*
239+
* @param req The {@link Connect.PutStateRequest}
240+
*/
241+
private void putConnectState(@NotNull Connect.PutStateRequest req) {
242+
try {
243+
session.api().putConnectState(connectionId, req);
244+
LOGGER.info(String.format("Put state. {ts: %d, connId: %s[truncated], reason: %s, request: %s}", req.getClientSideTimestamp(), connectionId.substring(0, 6),
245+
req.getPutStateReason(), ProtoUtils.toLogString(putState, LOGGER)));
246+
} catch (IOException | MercuryClient.MercuryException ex) {
247+
LOGGER.error("Failed updating state.", ex);
248+
}
249+
}
250+
240251
public enum Endpoint {
241252
Play("play"), Pause("pause"), Resume("resume"), SeekTo("seek_to"), SkipNext("skip_next"),
242253
SkipPrev("skip_prev"), SetShufflingContext("set_shuffling_context"), SetRepeatingContext("set_repeating_context"),
@@ -429,38 +440,4 @@ public Boolean valueBool() {
429440
}
430441
}
431442

432-
private class Worker implements Runnable {
433-
private final BlockingQueue<Connect.PutStateRequest> queue = new LinkedBlockingQueue<>();
434-
private volatile boolean shouldStop = false;
435-
436-
void stop() {
437-
shouldStop = true;
438-
}
439-
440-
void submit(@NotNull Connect.PutStateRequest request) {
441-
queue.add(request);
442-
}
443-
444-
@Override
445-
public void run() {
446-
while (!shouldStop) {
447-
Connect.PutStateRequest req;
448-
try {
449-
req = queue.poll(1000, TimeUnit.MILLISECONDS);
450-
if (shouldStop) return;
451-
if (req == null) continue;
452-
} catch (InterruptedException ignored) {
453-
continue;
454-
}
455-
456-
try {
457-
session.api().putConnectState(connectionId, req);
458-
LOGGER.info(String.format("Put state. {ts: %d, connId: %s[truncated], reason: %s, request: %s}", req.getClientSideTimestamp(), connectionId.substring(0, 6),
459-
req.getPutStateReason(), ProtoUtils.toLogString(putState, LOGGER)));
460-
} catch (IOException | MercuryClient.MercuryException ex) {
461-
LOGGER.error("Failed updating state.", ex);
462-
}
463-
}
464-
}
465-
}
466443
}
Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,30 @@
11
package xyz.gianlu.librespot.core;
22

33
import org.jetbrains.annotations.NotNull;
4+
import xyz.gianlu.librespot.common.AsyncWorker;
45
import xyz.gianlu.librespot.crypto.Packet;
56

67
import java.io.IOException;
7-
import java.util.concurrent.BlockingQueue;
88
import java.util.concurrent.ExecutorService;
9-
import java.util.concurrent.LinkedBlockingQueue;
109

1110
/**
1211
* @author Gianlu
1312
*/
1413
public abstract class PacketsManager implements AutoCloseable {
1514
protected final Session session;
16-
private final BlockingQueue<Packet> queue;
17-
private final Looper looper;
1815
private final ExecutorService executorService;
16+
private final AsyncWorker<Packet> asyncWorker;
1917

20-
public PacketsManager(@NotNull Session session) {
18+
public PacketsManager(@NotNull Session session, @NotNull String name) {
2119
this.session = session;
2220
this.executorService = session.executor();
23-
this.queue = new LinkedBlockingQueue<>();
24-
this.looper = new Looper();
25-
new Thread(looper, "packets-manager-" + looper.hashCode()).start();
21+
this.asyncWorker = new AsyncWorker<>("pm-" + name, packet -> executorService.execute(() -> {
22+
try {
23+
handle(packet);
24+
} catch (IOException ex) {
25+
exception(ex);
26+
}
27+
}));
2628
}
2729

2830
public final void dispatch(@NotNull Packet packet) {
@@ -31,42 +33,17 @@ public final void dispatch(@NotNull Packet packet) {
3133

3234
@Override
3335
public void close() {
34-
looper.stop();
36+
asyncWorker.close();
3537
}
3638

3739
/**
3840
* This method can be overridden to process packet synchronously. This MUST not block for a long period of time.
3941
*/
4042
protected void appendToQueue(@NotNull Packet packet) {
41-
queue.add(packet);
43+
asyncWorker.submit(packet);
4244
}
4345

4446
protected abstract void handle(@NotNull Packet packet) throws IOException;
4547

4648
protected abstract void exception(@NotNull Exception ex);
47-
48-
private final class Looper implements Runnable {
49-
private volatile boolean shouldStop = false;
50-
51-
@Override
52-
public void run() {
53-
while (!shouldStop) {
54-
try {
55-
Packet packet = queue.take();
56-
executorService.execute(() -> {
57-
try {
58-
handle(packet);
59-
} catch (IOException ex) {
60-
exception(ex);
61-
}
62-
});
63-
} catch (InterruptedException ignored) {
64-
}
65-
}
66-
}
67-
68-
void stop() {
69-
shouldStop = true;
70-
}
71-
}
7249
}

core/src/main/java/xyz/gianlu/librespot/dealer/DealerClient.java

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@
1111
import org.jetbrains.annotations.NotNull;
1212
import org.jetbrains.annotations.Nullable;
1313
import xyz.gianlu.librespot.BytesArrayList;
14+
import xyz.gianlu.librespot.common.AsyncWorker;
1415
import xyz.gianlu.librespot.common.NameThreadFactory;
1516
import xyz.gianlu.librespot.core.ApResolver;
1617
import xyz.gianlu.librespot.core.Session;
1718
import xyz.gianlu.librespot.mercury.MercuryClient;
1819

1920
import java.io.*;
2021
import java.util.*;
21-
import java.util.concurrent.*;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.ScheduledFuture;
25+
import java.util.concurrent.TimeUnit;
2226
import java.util.concurrent.atomic.AtomicReference;
2327
import java.util.zip.GZIPInputStream;
2428

@@ -27,7 +31,7 @@
2731
*/
2832
public class DealerClient implements Closeable {
2933
private static final Logger LOGGER = Logger.getLogger(DealerClient.class);
30-
private final Looper looper = new Looper();
34+
private final AsyncWorker<Runnable> asyncWorker;
3135
private final Session session;
3236
private final Map<String, RequestListener> reqListeners = new HashMap<>();
3337
private final Map<MessageListener, List<String>> msgListeners = new HashMap<>();
@@ -37,7 +41,7 @@ public class DealerClient implements Closeable {
3741

3842
public DealerClient(@NotNull Session session) {
3943
this.session = session;
40-
new Thread(looper, "dealer-looper").start();
44+
this.asyncWorker = new AsyncWorker<>("dealer-worker", Runnable::run);
4145
}
4246

4347
@NotNull
@@ -98,7 +102,7 @@ private void handleRequest(@NotNull JsonObject obj) {
98102
if (mid.startsWith(midPrefix)) {
99103
RequestListener listener = reqListeners.get(midPrefix);
100104
interesting = true;
101-
looper.submit(() -> {
105+
asyncWorker.submit(() -> {
102106
RequestResult result = listener.onRequest(mid, pid, sender, command);
103107
conn.get().sendReply(key, result);
104108
LOGGER.debug(String.format("Handled request. {key: %s, result: %s}", key, result));
@@ -156,7 +160,7 @@ private void handleMessage(@NotNull JsonObject obj) {
156160
for (String key : keys) {
157161
if (uri.startsWith(key) && !dispatched) {
158162
interesting = true;
159-
looper.submit(() -> {
163+
asyncWorker.submit(() -> {
160164
try {
161165
listener.onMessage(uri, headers, decodedPayload);
162166
} catch (IOException ex) {
@@ -217,6 +221,7 @@ public void close() {
217221
lastScheduledReconnection = null;
218222
}
219223

224+
asyncWorker.close();
220225
scheduler.shutdown();
221226
msgListeners.clear();
222227
}
@@ -259,31 +264,6 @@ public interface MessageListener {
259264
void onMessage(@NotNull String uri, @NotNull Map<String, String> headers, @NotNull byte[] payload) throws IOException;
260265
}
261266

262-
private static class Looper implements Runnable, Closeable {
263-
private final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
264-
private boolean shouldStop = false;
265-
266-
void submit(@NotNull Runnable task) {
267-
tasks.add(task);
268-
}
269-
270-
@Override
271-
public void run() {
272-
while (!shouldStop) {
273-
try {
274-
tasks.take().run();
275-
} catch (InterruptedException ex) {
276-
break;
277-
}
278-
}
279-
}
280-
281-
@Override
282-
public void close() {
283-
shouldStop = true;
284-
}
285-
}
286-
287267
private class ConnectionHolder implements Closeable {
288268
private final WebSocket ws;
289269
private boolean closed = false;

core/src/main/java/xyz/gianlu/librespot/mercury/MercuryClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public final class MercuryClient extends PacketsManager {
3737
private final Map<Long, BytesArrayList> partials = new HashMap<>();
3838

3939
public MercuryClient(@NotNull Session session) {
40-
super(session);
40+
super(session, "mercury");
4141
}
4242

4343
public void subscribe(@NotNull String uri, @NotNull SubListener listener) throws IOException, PubSubException {

core/src/main/java/xyz/gianlu/librespot/player/AudioKeyManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public final class AudioKeyManager extends PacketsManager {
2929
private final Map<Integer, Callback> callbacks = Collections.synchronizedMap(new HashMap<>());
3030

3131
public AudioKeyManager(@NotNull Session session) {
32-
super(session);
32+
super(session, "audio-keys");
3333
}
3434

3535
@NotNull

0 commit comments

Comments
 (0)