Skip to content

Commit 0d65693

Browse files
committed
Making PacketsManager (PacketsReceiver) sync
1 parent 8170def commit 0d65693

6 files changed

Lines changed: 33 additions & 97 deletions

File tree

lib/src/main/java/xyz/gianlu/librespot/audio/AudioKeyManager.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.jetbrains.annotations.NotNull;
77
import org.jetbrains.annotations.Nullable;
88
import xyz.gianlu.librespot.common.Utils;
9-
import xyz.gianlu.librespot.core.PacketsManager;
9+
import xyz.gianlu.librespot.core.PacketsReceiver;
1010
import xyz.gianlu.librespot.core.Session;
1111
import xyz.gianlu.librespot.crypto.Packet;
1212

@@ -22,15 +22,16 @@
2222
/**
2323
* @author Gianlu
2424
*/
25-
public final class AudioKeyManager extends PacketsManager {
25+
public final class AudioKeyManager implements PacketsReceiver {
2626
private static final byte[] ZERO_SHORT = new byte[]{0, 0};
2727
private static final Logger LOGGER = LogManager.getLogger(AudioKeyManager.class);
2828
private static final long AUDIO_KEY_REQUEST_TIMEOUT = 2000;
2929
private final AtomicInteger seqHolder = new AtomicInteger(0);
3030
private final Map<Integer, Callback> callbacks = Collections.synchronizedMap(new HashMap<>());
31+
private final Session session;
3132

3233
public AudioKeyManager(@NotNull Session session) {
33-
super(session, "audio-keys");
34+
this.session = session;
3435
}
3536

3637
@NotNull
@@ -67,7 +68,7 @@ else throw new AesKeyException(String.format("Failed fetching audio key! {gid: %
6768
}
6869

6970
@Override
70-
protected void handle(@NotNull Packet packet) {
71+
public void dispatch(@NotNull Packet packet) {
7172
ByteBuffer payload = ByteBuffer.wrap(packet.payload);
7273
int seq = payload.getInt();
7374

@@ -89,11 +90,6 @@ protected void handle(@NotNull Packet packet) {
8990
}
9091
}
9192

92-
@Override
93-
protected void exception(@NotNull Exception ex) {
94-
LOGGER.fatal("Failed handling packet!", ex);
95-
}
96-
9793
private interface Callback {
9894
void key(byte[] key);
9995

lib/src/main/java/xyz/gianlu/librespot/audio/storage/ChannelManager.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
import org.jetbrains.annotations.NotNull;
77
import xyz.gianlu.librespot.common.NameThreadFactory;
88
import xyz.gianlu.librespot.common.Utils;
9-
import xyz.gianlu.librespot.core.PacketsManager;
9+
import xyz.gianlu.librespot.core.PacketsReceiver;
1010
import xyz.gianlu.librespot.core.Session;
1111
import xyz.gianlu.librespot.crypto.Packet;
1212

1313
import java.io.ByteArrayOutputStream;
14+
import java.io.Closeable;
1415
import java.io.DataOutputStream;
1516
import java.io.IOException;
1617
import java.nio.ByteBuffer;
@@ -25,15 +26,16 @@
2526
/**
2627
* @author Gianlu
2728
*/
28-
public class ChannelManager extends PacketsManager {
29+
public class ChannelManager implements Closeable, PacketsReceiver {
2930
public static final int CHUNK_SIZE = 128 * 1024;
3031
private static final Logger LOGGER = LogManager.getLogger(ChannelManager.class);
3132
private final Map<Short, Channel> channels = new HashMap<>();
3233
private final AtomicInteger seqHolder = new AtomicInteger(0);
3334
private final ExecutorService executorService = Executors.newCachedThreadPool(new NameThreadFactory(r -> "channel-queue-" + r.hashCode()));
35+
private final Session session;
3436

3537
public ChannelManager(@NotNull Session session) {
36-
super(session, "channels");
38+
this.session = session;
3739
}
3840

3941
void requestChunk(@NotNull ByteString fileId, int index, @NotNull AudioFile file) throws IOException {
@@ -59,12 +61,7 @@ void requestChunk(@NotNull ByteString fileId, int index, @NotNull AudioFile file
5961
}
6062

6163
@Override
62-
protected void handle(@NotNull Packet packet) {
63-
throw new UnsupportedOperationException();
64-
}
65-
66-
@Override
67-
protected void appendToQueue(@NotNull Packet packet) {
64+
public void dispatch(@NotNull Packet packet) {
6865
ByteBuffer payload = ByteBuffer.wrap(packet.payload);
6966
if (packet.is(Packet.Type.StreamChunkRes)) {
7067
short id = payload.getShort();
@@ -89,15 +86,9 @@ protected void appendToQueue(@NotNull Packet packet) {
8986
}
9087
}
9188

92-
@Override
93-
protected void exception(@NotNull Exception ex) {
94-
LOGGER.fatal("Failed handling packet!", ex);
95-
}
96-
9789
@Override
9890
public void close() {
9991
executorService.shutdown();
100-
super.close();
10192
}
10293

10394
public class Channel {

lib/src/main/java/xyz/gianlu/librespot/core/PacketsManager.java

Lines changed: 0 additions & 50 deletions
This file was deleted.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package xyz.gianlu.librespot.core;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
import xyz.gianlu.librespot.crypto.Packet;
5+
6+
/**
7+
* @author Gianlu
8+
*/
9+
public interface PacketsReceiver {
10+
void dispatch(@NotNull Packet packet);
11+
}

lib/src/main/java/xyz/gianlu/librespot/core/Session.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import okio.Okio;
1616
import org.apache.logging.log4j.LogManager;
1717
import org.apache.logging.log4j.Logger;
18-
import org.jetbrains.annotations.Contract;
1918
import org.jetbrains.annotations.NotNull;
2019
import org.jetbrains.annotations.Nullable;
2120
import org.w3c.dom.Document;
@@ -428,7 +427,6 @@ public void close() throws IOException {
428427
}
429428

430429
if (audioKeyManager != null) {
431-
audioKeyManager.close();
432430
audioKeyManager = null;
433431
}
434432

@@ -625,11 +623,6 @@ public boolean reconnecting() {
625623
return !closing && !closed && conn == null;
626624
}
627625

628-
@NotNull
629-
ExecutorService executor() {
630-
return executorService;
631-
}
632-
633626
@Nullable
634627
public String countryCode() {
635628
return countryCode;
@@ -737,7 +730,7 @@ public String getUserAttribute(@NotNull String key) {
737730
return userAttributes.get(key);
738731
}
739732

740-
@Contract("_, !null -> !null")
733+
@NotNull
741734
public String getUserAttribute(@NotNull String key, @NotNull String fallback) {
742735
return userAttributes.getOrDefault(key, fallback);
743736
}
@@ -1366,7 +1359,7 @@ public void run() {
13661359
try {
13671360
parseProductInfo(new ByteArrayInputStream(packet.payload));
13681361
} catch (IOException | ParserConfigurationException | SAXException ex) {
1369-
LOGGER.warn("Failed parsing prodcut info!", ex);
1362+
LOGGER.warn("Failed parsing product info!", ex);
13701363
}
13711364
break;
13721365
default:

lib/src/main/java/xyz/gianlu/librespot/mercury/MercuryClient.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
import xyz.gianlu.librespot.common.BytesArrayList;
1414
import xyz.gianlu.librespot.common.ProtobufToJson;
1515
import xyz.gianlu.librespot.common.Utils;
16-
import xyz.gianlu.librespot.core.PacketsManager;
16+
import xyz.gianlu.librespot.core.PacketsReceiver;
1717
import xyz.gianlu.librespot.core.Session;
1818
import xyz.gianlu.librespot.crypto.Packet;
1919

2020
import java.io.ByteArrayOutputStream;
21+
import java.io.Closeable;
2122
import java.io.DataOutputStream;
2223
import java.io.IOException;
2324
import java.nio.ByteBuffer;
@@ -28,17 +29,18 @@
2829
/**
2930
* @author Gianlu
3031
*/
31-
public final class MercuryClient extends PacketsManager {
32+
public final class MercuryClient implements PacketsReceiver, Closeable {
3233
private static final Logger LOGGER = LogManager.getLogger(MercuryClient.class);
3334
private static final int MERCURY_REQUEST_TIMEOUT = 3000;
3435
private final AtomicInteger seqHolder = new AtomicInteger(1);
3536
private final Map<Long, Callback> callbacks = Collections.synchronizedMap(new HashMap<>());
3637
private final Object removeCallbackLock = new Object();
3738
private final List<InternalSubListener> subscriptions = Collections.synchronizedList(new ArrayList<>());
3839
private final Map<Long, BytesArrayList> partials = new HashMap<>();
40+
private final Session session;
3941

4042
public MercuryClient(@NotNull Session session) {
41-
super(session, "mercury");
43+
this.session = session;
4244
}
4345

4446
public void subscribe(@NotNull String uri, @NotNull SubListener listener) throws IOException, PubSubException {
@@ -160,7 +162,7 @@ public int send(@NotNull RawMercuryRequest request, @NotNull Callback callback)
160162
}
161163

162164
@Override
163-
protected void handle(@NotNull Packet packet) throws InvalidProtocolBufferException {
165+
public void dispatch(@NotNull Packet packet) {
164166
ByteBuffer payload = ByteBuffer.wrap(packet.payload);
165167
int seqLength = payload.getShort();
166168
long seq;
@@ -196,7 +198,7 @@ protected void handle(@NotNull Packet packet) throws InvalidProtocolBufferExcept
196198
header = Mercury.Header.parseFrom(partial.get(0));
197199
} catch (InvalidProtocolBufferException ex) {
198200
LOGGER.fatal("Couldn't parse header! {bytes: {}}", Utils.bytesToHex(partial.get(0)));
199-
throw ex;
201+
return;
200202
}
201203

202204
Response resp = new Response(header, partial);
@@ -216,11 +218,10 @@ protected void handle(@NotNull Packet packet) throws InvalidProtocolBufferExcept
216218
LOGGER.debug("Couldn't dispatch Mercury event {seq: {}, uri: {}, code: {}, payload: {}}", seq, header.getUri(), header.getStatusCode(), resp.payload.toHex());
217219
} else if (packet.is(Packet.Type.MercuryReq) || packet.is(Packet.Type.MercurySub) || packet.is(Packet.Type.MercuryUnsub)) {
218220
Callback callback = callbacks.remove(seq);
219-
if (callback != null) {
221+
if (callback != null)
220222
callback.response(resp);
221-
} else {
223+
else
222224
LOGGER.warn("Skipped Mercury response, seq: {}, uri: {}, code: {}", seq, header.getUri(), header.getStatusCode());
223-
}
224225

225226
synchronized (removeCallbackLock) {
226227
removeCallbackLock.notifyAll();
@@ -230,11 +231,6 @@ protected void handle(@NotNull Packet packet) throws InvalidProtocolBufferExcept
230231
}
231232
}
232233

233-
@Override
234-
protected void exception(@NotNull Exception ex) {
235-
LOGGER.fatal("Failed handling packet!", ex);
236-
}
237-
238234
public void interestedIn(@NotNull String uri, @NotNull SubListener listener) {
239235
subscriptions.add(new InternalSubListener(uri, listener, false));
240236
}
@@ -266,7 +262,6 @@ public void close() {
266262
}
267263

268264
callbacks.clear();
269-
super.close();
270265
}
271266

272267
public interface JsonCallback<W extends JsonWrapper> {

0 commit comments

Comments
 (0)