11package xyz .gianlu .librespot .player ;
22
33import org .jetbrains .annotations .NotNull ;
4- import org .jetbrains .annotations .Nullable ;
54
65import java .io .IOException ;
76import java .io .InputStream ;
8- import java .util .concurrent .atomic .AtomicInteger ;
97
108import static xyz .gianlu .librespot .player .feeders .storage .ChannelManager .CHUNK_SIZE ;
119
1210/**
1311 * @author Gianlu
1412 */
15- public abstract class AbsChunckedInputStream extends InputStream {
16- private static final int PRELOAD_AHEAD = 3 ;
17- private final AtomicInteger waitForChunk = new AtomicInteger (-1 );
18- private final HaltListener haltListener ;
19- private ChunkException chunkException = null ;
13+ public abstract class AbsChunkedInputStream extends InputStream implements HaltListener {
14+ public static final int PRELOAD_AHEAD = 3 ;
15+ private static final int PRELOAD_CHUNK_RETRIES = 2 ;
16+ private static final int MAX_CHUNK_TRIES = 128 ;
17+ private final Object waitLock = new Object ();
18+ private final int [] retries ;
19+ private final boolean stopPlaybackOnChunkError ;
20+ private volatile int waitForChunk = -1 ;
21+ private volatile ChunkException chunkException = null ;
2022 private int pos = 0 ;
2123 private int mark = 0 ;
2224 private volatile boolean closed = false ;
2325
24- protected AbsChunckedInputStream (@ Nullable HaltListener haltListener ) {
25- this .haltListener = haltListener ;
26+ protected AbsChunkedInputStream (@ NotNull Player .Configuration conf ) {
27+ this .retries = new int [chunks ()];
28+ this .stopPlaybackOnChunkError = conf .stopPlaybackOnChunkError ();
2629 }
2730
2831 public final boolean isClosed () {
@@ -37,8 +40,8 @@ public final boolean isClosed() {
3740 public void close () {
3841 closed = true ;
3942
40- synchronized (waitForChunk ) {
41- waitForChunk .notifyAll ();
43+ synchronized (waitLock ) {
44+ waitLock .notifyAll ();
4245 }
4346 }
4447
@@ -71,33 +74,11 @@ public final synchronized long skip(long n) throws IOException {
7174 pos += k ;
7275
7376 int chunk = pos / CHUNK_SIZE ;
74- checkAvailability (chunk , false );
77+ checkAvailability (chunk , false , false );
7578
7679 return k ;
7780 }
7881
79- public void waitFor (int chunkIndex ) throws IOException {
80- if (isClosed ()) return ;
81-
82- if (availableChunks ()[chunkIndex ]) return ;
83-
84- synchronized (waitForChunk ) {
85- try {
86- chunkException = null ;
87-
88- waitForChunk .set (chunkIndex );
89- waitForChunk .wait ();
90-
91- if (closed ) return ;
92-
93- if (chunkException != null )
94- throw chunkException ;
95- } catch (InterruptedException ex ) {
96- throw new IOException (ex );
97- }
98- }
99- }
100-
10182 protected abstract boolean [] requestedChunks ();
10283
10384 protected abstract boolean [] availableChunks ();
@@ -109,25 +90,74 @@ public void waitFor(int chunkIndex) throws IOException {
10990 */
11091 protected abstract void requestChunkFromStream (int index );
11192
112- private void checkAvailability (int chunk , boolean wait ) throws IOException {
93+ /**
94+ * Should we retry fetching this chunk? MUST be called only for chunks that are needed immediately ({@code wait = true})!
95+ *
96+ * @param chunk The chunk index
97+ * @return Whether we should retry.
98+ */
99+ private boolean shouldRetry (int chunk ) {
100+ if (retries [chunk ] < 1 ) return true ;
101+ if (retries [chunk ] > MAX_CHUNK_TRIES ) return false ;
102+ return !stopPlaybackOnChunkError ;
103+ }
104+
105+ /**
106+ * Chunk if {@param chunk} is available or wait until it becomes, also handles the retry mechanism.
107+ *
108+ * @param chunk The chunk index
109+ * @param wait Whether we should wait for {@param chunk} to be available
110+ * @param halted Whether we have already notified that the retrieving of this chunk is halted
111+ * @throws IOException If we fail to retrieve this chunk and no more retries are available
112+ */
113+ private void checkAvailability (int chunk , boolean wait , boolean halted ) throws IOException {
114+ if (halted && !wait ) throw new IllegalArgumentException ();
115+
113116 if (!requestedChunks ()[chunk ]) {
114117 requestChunkFromStream (chunk );
115118 requestedChunks ()[chunk ] = true ;
116119 }
117120
118121 for (int i = chunk + 1 ; i <= Math .min (chunks () - 1 , chunk + PRELOAD_AHEAD ); i ++) {
119- if (!requestedChunks ()[i ]) {
122+ if (!requestedChunks ()[i ] && retries [ i ] < PRELOAD_CHUNK_RETRIES ) {
120123 requestChunkFromStream (i );
121124 requestedChunks ()[i ] = true ;
122125 }
123126 }
124127
125- if (availableChunks ()[chunk ]) return ;
126-
127128 if (wait ) {
128- if (haltListener != null ) haltListener .streamReadHalted (chunk , System .currentTimeMillis ());
129- waitFor (chunk );
130- if (haltListener != null ) haltListener .streamReadResumed (chunk , System .currentTimeMillis ());
129+ if (availableChunks ()[chunk ]) return ;
130+
131+ boolean retry = false ;
132+ synchronized (waitLock ) {
133+ if (!halted ) streamReadHalted (chunk , System .currentTimeMillis ());
134+
135+ try {
136+ chunkException = null ;
137+ waitForChunk = chunk ;
138+ waitLock .wait ();
139+
140+ if (closed ) return ;
141+
142+ if (chunkException != null ) {
143+ if (shouldRetry (chunk )) retry = true ;
144+ else throw chunkException ;
145+ }
146+ } catch (InterruptedException ex ) {
147+ throw new IOException (ex );
148+ }
149+
150+ if (!retry ) streamReadResumed (chunk , System .currentTimeMillis ());
151+ }
152+
153+ if (retry ) {
154+ try {
155+ Thread .sleep ((long ) (Math .log10 (retries [chunk ]) * 1000 ));
156+ } catch (InterruptedException ignored ) {
157+ }
158+
159+ checkAvailability (chunk , true , true ); // We must exit the synchronized block!
160+ }
131161 }
132162 }
133163
@@ -149,7 +179,7 @@ public final int read(@NotNull byte[] b, int off, int len) throws IOException {
149179 int chunk = pos / CHUNK_SIZE ;
150180 int chunkOff = pos % CHUNK_SIZE ;
151181
152- checkAvailability (chunk , true );
182+ checkAvailability (chunk , true , false );
153183
154184 int copy = Math .min (buffer ()[chunk ].length - chunkOff , len - i );
155185 System .arraycopy (buffer ()[chunk ], chunkOff , b , off + i , copy );
@@ -169,41 +199,36 @@ public final synchronized int read() throws IOException {
169199 return -1 ;
170200
171201 int chunk = pos / CHUNK_SIZE ;
172- checkAvailability (chunk , true );
202+ checkAvailability (chunk , true , false );
173203
174204 return buffer ()[chunk ][pos ++ % CHUNK_SIZE ] & 0xff ;
175205 }
176206
177207 public final void notifyChunkAvailable (int index ) {
178208 availableChunks ()[index ] = true ;
179209
180- if ( index == waitForChunk . get () && ! closed ) {
181- synchronized ( waitForChunk ) {
182- waitForChunk . set (- 1 ) ;
183- waitForChunk .notifyAll ();
210+ synchronized ( waitLock ) {
211+ if ( index == waitForChunk && ! closed ) {
212+ waitForChunk = - 1 ;
213+ waitLock .notifyAll ();
184214 }
185215 }
186216 }
187217
188218 public final void notifyChunkError (int index , @ NotNull ChunkException ex ) {
189219 availableChunks ()[index ] = false ;
190220 requestedChunks ()[index ] = false ;
221+ retries [index ] += 1 ;
191222
192- if ( index == waitForChunk . get () && ! closed ) {
193- synchronized ( waitForChunk ) {
223+ synchronized ( waitLock ) {
224+ if ( index == waitForChunk && ! closed ) {
194225 chunkException = ex ;
195- waitForChunk . set (- 1 ) ;
196- waitForChunk .notifyAll ();
226+ waitForChunk = - 1 ;
227+ waitLock .notifyAll ();
197228 }
198229 }
199230 }
200231
201- public interface HaltListener {
202- void streamReadHalted (int chunk , long time );
203-
204- void streamReadResumed (int chunk , long time );
205- }
206-
207232 public static class ChunkException extends IOException {
208233 public ChunkException (@ NotNull Throwable cause ) {
209234 super (cause );
0 commit comments