Skip to content

Commit 2e8db57

Browse files
funtaxma5309devgianlu
authored
Close Threads/Executors on Session.close() (#209)
* Close Threads/Executors on Session.close() * Minor refactoring + moved thread initialization for Session.Receiver + shutdown executor in ChannelManager * Removed TODO Co-authored-by: ma5309 <[email protected]> Co-authored-by: Gianlu <[email protected]>
1 parent 1e8e103 commit 2e8db57

8 files changed

Lines changed: 66 additions & 14 deletions

File tree

common/src/main/java/xyz/gianlu/librespot/common/AsyncWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public void close() {
3737

3838
@Override
3939
public void run() {
40+
LOGGER.trace(String.format("AsyncWorker{%s} is starting", name));
41+
4042
while (running) {
4143
try {
4244
T polled = internalQueue.take();

core/src/main/java/xyz/gianlu/librespot/core/PacketsManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
import xyz.gianlu.librespot.common.AsyncWorker;
55
import xyz.gianlu.librespot.crypto.Packet;
66

7+
import java.io.Closeable;
78
import java.io.IOException;
89
import java.util.concurrent.ExecutorService;
910

1011
/**
1112
* @author Gianlu
1213
*/
13-
public abstract class PacketsManager implements AutoCloseable {
14+
public abstract class PacketsManager implements Closeable {
1415
protected final Session session;
1516
private final ExecutorService executorService;
1617
private final AsyncWorker<Packet> asyncWorker;

core/src/main/java/xyz/gianlu/librespot/core/Session.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ private void authenticatePartial(@NotNull Authentication.LoginCredentials creden
383383
apWelcome = Authentication.APWelcome.parseFrom(packet.payload);
384384

385385
receiver = new Receiver();
386-
new Thread(receiver, "session-packet-receiver").start();
386+
387387

388388
byte[] bytes0x0f = new byte[20];
389389
random().nextBytes(bytes0x0f);
@@ -426,6 +426,9 @@ private void authenticatePartial(@NotNull Authentication.LoginCredentials creden
426426

427427
@Override
428428
public void close() throws IOException {
429+
LOGGER.info(String.format("Closing session. {deviceId: %s} ", inner.deviceId));
430+
scheduler.shutdownNow();
431+
429432
if (receiver != null) {
430433
receiver.stop();
431434
receiver = null;
@@ -457,7 +460,11 @@ public void close() throws IOException {
457460
}
458461

459462
executorService.shutdown();
460-
conn.socket.close();
463+
464+
if (conn != null) {
465+
conn.socket.close();
466+
conn = null;
467+
}
461468

462469
synchronized (authLock) {
463470
apWelcome = null;
@@ -473,7 +480,7 @@ public void close() throws IOException {
473480
}
474481
}
475482

476-
LOGGER.info(String.format("Closed session. {deviceId: %s, ap: %s} ", inner.deviceId, conn.socket.getInetAddress()));
483+
LOGGER.info(String.format("Closed session. {deviceId: %s} ", inner.deviceId));
477484
}
478485

479486
private void sendUnchecked(Packet.Type cmd, byte[] payload) throws IOException {
@@ -662,7 +669,12 @@ private void reconnect() {
662669
} catch (IOException | GeneralSecurityException | SpotifyAuthenticationException ex) {
663670
conn = null;
664671
LOGGER.error("Failed reconnecting, retrying in 10 seconds...", ex);
665-
scheduler.schedule(this::reconnect, 10, TimeUnit.SECONDS);
672+
673+
try {
674+
scheduler.schedule(this::reconnect, 10, TimeUnit.SECONDS);
675+
} catch (RejectedExecutionException exx) {
676+
LOGGER.info("Scheduler already shutdown, stopping reconnection", exx);
677+
}
666678
}
667679
}
668680

@@ -1042,18 +1054,24 @@ protected PasswordAuthentication getPasswordAuthentication() {
10421054
}
10431055

10441056
private class Receiver implements Runnable {
1045-
private volatile boolean shouldStop = false;
1057+
private final Thread thread;
1058+
private volatile boolean running = true;
10461059

10471060
private Receiver() {
1061+
thread = new Thread(this, "session-packet-receiver");
1062+
thread.start();
10481063
}
10491064

10501065
void stop() {
1051-
shouldStop = true;
1066+
running = false;
1067+
thread.interrupt();
10521068
}
10531069

10541070
@Override
10551071
public void run() {
1056-
while (!shouldStop) {
1072+
LOGGER.trace("Session.Receiver started");
1073+
1074+
while (running) {
10571075
Packet packet;
10581076
Packet.Type cmd;
10591077
try {
@@ -1064,15 +1082,15 @@ public void run() {
10641082
continue;
10651083
}
10661084
} catch (IOException | GeneralSecurityException ex) {
1067-
if (!shouldStop) {
1085+
if (running) {
10681086
LOGGER.fatal("Failed reading packet!", ex);
10691087
reconnect();
10701088
}
10711089

1072-
return;
1090+
break;
10731091
}
10741092

1075-
if (shouldStop) return;
1093+
if (!running) break;
10761094

10771095
switch (cmd) {
10781096
case Ping:
@@ -1137,6 +1155,8 @@ public void run() {
11371155
break;
11381156
}
11391157
}
1158+
1159+
LOGGER.trace("Session.Receiver stopped");
11401160
}
11411161
}
11421162
}

core/src/main/java/xyz/gianlu/librespot/core/ZeroconfServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ private void handle(@NotNull Socket socket) throws IOException {
475475
public void close() throws IOException {
476476
shouldStop = true;
477477
serverSocket.close();
478+
executorService.shutdown();
478479
}
479480
}
480481
}

core/src/main/java/xyz/gianlu/librespot/dealer/DealerClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ public void removeRequestListener(@NotNull RequestListener listener) {
209209

210210
@Override
211211
public void close() {
212+
asyncWorker.close();
213+
scheduler.shutdown();
214+
212215
if (conn != null) {
213216
ConnectionHolder tmp = conn; // Do not trigger connectionInvalided()
214217
conn = null;
@@ -220,8 +223,6 @@ public void close() {
220223
lastScheduledReconnection = null;
221224
}
222225

223-
asyncWorker.close();
224-
scheduler.shutdown();
225226
msgListeners.clear();
226227
}
227228

core/src/main/java/xyz/gianlu/librespot/player/Player.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,9 @@ public void close() throws IOException {
646646

647647
runner.close();
648648
if (state != null) state.removeListener(this);
649+
650+
scheduler.shutdown();
651+
events.close();
649652
}
650653

651654
@Nullable
@@ -983,5 +986,9 @@ void inactiveSession(boolean timeout) {
983986
for (EventsListener l : new ArrayList<>(listeners))
984987
executorService.execute(() -> l.onInactiveSession(timeout));
985988
}
989+
990+
public void close() {
991+
executorService.shutdown();
992+
}
986993
}
987994
}

core/src/main/java/xyz/gianlu/librespot/player/PlayerRunner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ TrackHandler load(@NotNull PlayableId playable, int pos) {
150150

151151
@Override
152152
public void run() {
153+
LOGGER.trace("PlayerRunner is starting");
154+
153155
byte[] buffer = new byte[Codec.BUFFER_SIZE * 2];
154156

155157
boolean started = false;
@@ -185,6 +187,8 @@ public void run() {
185187
output.close();
186188
} catch (IOException ignored) {
187189
}
190+
191+
LOGGER.trace("PlayerRunner is shutting down");
188192
}
189193

190194
@Override
@@ -746,6 +750,8 @@ void abortCrossfade() {
746750

747751
@Override
748752
public void run() {
753+
LOGGER.trace("PlayerRunner.TrackHandler is starting");
754+
749755
waitReady();
750756

751757
int seekTo = -1;
@@ -789,6 +795,8 @@ public void run() {
789795
}
790796

791797
close();
798+
799+
LOGGER.trace("PlayerRunner.TrackHandler is shutting down");
792800
}
793801

794802
boolean isInMixer() {

core/src/main/java/xyz/gianlu/librespot/player/feeders/storage/ChannelManager.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ protected void exception(@NotNull Exception ex) {
9393
LOGGER.fatal("Failed handling packet!", ex);
9494
}
9595

96+
@Override
97+
public void close() {
98+
executorService.shutdown();
99+
super.close();
100+
}
101+
96102
public class Channel {
97103
public final short id;
98104
private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<>();
@@ -160,16 +166,22 @@ private class Handler implements Runnable {
160166

161167
@Override
162168
public void run() {
169+
LOGGER.trace("ChannelManager.Handler is starting");
170+
163171
while (true) {
164172
try {
165173
if (handle(queue.take())) {
166174
channels.remove(id);
167175
break;
168176
}
169-
} catch (InterruptedException | IOException ex) {
177+
} catch (IOException ex) {
170178
LOGGER.fatal("Failed handling packet!", ex);
179+
} catch (InterruptedException ex) {
180+
break;
171181
}
172182
}
183+
184+
LOGGER.trace("ChannelManager.Handler is shutting down");
173185
}
174186
}
175187
}

0 commit comments

Comments
 (0)