2323import java .util .concurrent .ScheduledExecutorService ;
2424import java .util .concurrent .ScheduledFuture ;
2525import java .util .concurrent .TimeUnit ;
26- import java .util .concurrent .atomic .AtomicReference ;
2726import java .util .zip .GZIPInputStream ;
2827
2928/**
@@ -36,7 +35,7 @@ public class DealerClient implements Closeable {
3635 private final Map <String , RequestListener > reqListeners = new HashMap <>();
3736 private final Map <MessageListener , List <String >> msgListeners = new HashMap <>();
3837 private final ScheduledExecutorService scheduler = Executors .newSingleThreadScheduledExecutor (new NameThreadFactory ((r ) -> "dealer-scheduler-" + r .hashCode ()));
39- private final AtomicReference < ConnectionHolder > conn = new AtomicReference <>() ;
38+ private volatile ConnectionHolder conn = null ;
4039 private ScheduledFuture <?> lastScheduledReconnection ;
4140
4241 public DealerClient (@ NotNull Session session ) {
@@ -57,10 +56,10 @@ private static Map<String, String> getHeaders(@NotNull JsonObject obj) {
5756 /**
5857 * Creates a new WebSocket client. <b>Intended for internal use only!</b>
5958 */
60- public void connect () throws IOException , MercuryClient .MercuryException {
61- conn . set ( new ConnectionHolder (session , new Request .Builder ()
59+ public synchronized void connect () throws IOException , MercuryClient .MercuryException {
60+ conn = new ConnectionHolder (session , new Request .Builder ()
6261 .url (String .format ("wss://%s/?access_token=%s" , ApResolver .getRandomDealer (), session .tokens ().get ("playlist-read" )))
63- .build ())) ;
62+ .build ());
6463 }
6564
6665 private void waitForListeners () {
@@ -104,7 +103,7 @@ private void handleRequest(@NotNull JsonObject obj) {
104103 interesting = true ;
105104 asyncWorker .submit (() -> {
106105 RequestResult result = listener .onRequest (mid , pid , sender , command );
107- conn . get () .sendReply (key , result );
106+ if ( conn != null ) conn .sendReply (key , result );
108107 LOGGER .debug (String .format ("Handled request. {key: %s, result: %s}" , key , result ));
109108 });
110109 }
@@ -210,9 +209,9 @@ public void removeRequestListener(@NotNull RequestListener listener) {
210209
211210 @ Override
212211 public void close () {
213- if (conn . get () != null ) {
214- ConnectionHolder tmp = conn . get () ; // Do not trigger connectionInvalided()
215- conn . set ( null ) ;
212+ if (conn != null ) {
213+ ConnectionHolder tmp = conn ; // Do not trigger connectionInvalided()
214+ conn = null ;
216215 tmp .close ();
217216 }
218217
@@ -229,11 +228,11 @@ public void close() {
229228 /**
230229 * Called when the {@link ConnectionHolder} has been closed and cannot be used no more for a connection.
231230 */
232- private void connectionInvalided () {
231+ private synchronized void connectionInvalided () {
233232 if (lastScheduledReconnection != null && !lastScheduledReconnection .isDone ())
234233 throw new IllegalStateException ();
235234
236- conn . set ( null ) ;
235+ conn = null ;
237236
238237 LOGGER .trace ("Scheduled reconnection attempt in 10 seconds..." );
239238 lastScheduledReconnection = scheduler .schedule (() -> {
@@ -297,13 +296,10 @@ public void close() {
297296 lastScheduledPing = null ;
298297 }
299298
300- ConnectionHolder holder = conn .get ();
301- if (holder == null ) return ;
302-
303- if (holder == ConnectionHolder .this )
299+ if (conn == ConnectionHolder .this )
304300 connectionInvalided ();
305301 else
306- LOGGER .debug (String .format ("Did not dispatch connection invalidated: %s != %s" , holder , ConnectionHolder .this ));
302+ LOGGER .debug (String .format ("Did not dispatch connection invalidated: %s != %s" , conn , ConnectionHolder .this ));
307303 }
308304
309305 private class WebSocketListenerImpl extends WebSocketListener {
@@ -343,10 +339,18 @@ public void onMessage(@NotNull WebSocket ws, @NotNull String text) {
343339 MessageType type = MessageType .parse (obj .get ("type" ).getAsString ());
344340 switch (type ) {
345341 case MESSAGE :
346- handleMessage (obj );
342+ try {
343+ handleMessage (obj );
344+ } catch (Exception ex ) {
345+ LOGGER .warn ("Failed handling message: " + obj , ex );
346+ }
347347 break ;
348348 case REQUEST :
349- handleRequest (obj );
349+ try {
350+ handleRequest (obj );
351+ } catch (Exception ex ) {
352+ LOGGER .warn ("Failed handling request: " + obj , ex );
353+ }
350354 break ;
351355 case PONG :
352356 receivedPong = true ;
0 commit comments