SOLR-17208: Parallelize backup and restore file transfers#4023
SOLR-17208: Parallelize backup and restore file transfers#4023elangelo wants to merge 12 commits intoapache:mainfrom
Conversation
…to 1 thread, allow overriding with system properties
epugh
left a comment
There was a problem hiding this comment.
I don't have the multi thread chops to approve this, but reaidn through it looks good. I wanted a change to the variable name. Do we need any new tests for this capablity, or do the existing ones cover it well enough?
I think the current tests actually cover everything already. Mind that I did change the gcsrepository and s3repository tests to have some parallelism. Unfortunately I was limited to only 2 threads as with more I got an OutOfMemoryException. But I think it still covers what needs covering. |
epugh
left a comment
There was a problem hiding this comment.
LGTM. I'd love another committer who is more comfortable with this code base and especially the multithreaded nature of it to review as well.
… would be such a bottleneck
…was referred to by the non-canonical name `ExecutorUtil.MDCAwareThreadPoolExecutor.CallerRunsPolicy`
|
This PR has had no activity for 60 days and is now labeled as stale. Any new activity will remove the stale label. To attract more reviewers, please tag people who might be familiar with the code area and/or notify the [email protected] mailing list. To exempt this PR from being marked as stale, make it a draft PR or add the label "exempt-stale". If left unattended, this PR will be closed after another 60 days of inactivity. Thank you for your contribution! |
There was a problem hiding this comment.
Pull request overview
This PR adds configurable parallelism to Solr’s backup (incremental shard backup) and restore (core restore) file-transfer loops to improve throughput, especially for higher-latency cloud repositories (e.g., S3/GCS).
Changes:
- Add parallel upload/download execution for index file transfers during backup and restore, gated by new sysprop/env settings.
- Document the new parallel transfer settings in the ref guide.
- Update S3/GCS incremental backup tests to enable parallelism and add an unreleased changelog entry.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc | Documents new parallel upload/download properties and tuning guidance. |
| solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java | Enables parallel backup/restore via sysprops for S3 incremental backup tests. |
| solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java | Enables parallel backup/restore via sysprops for GCS incremental backup tests. |
| solr/core/src/java/org/apache/solr/handler/RestoreCore.java | Parallelizes restore file copy/download work via an executor and aggregates errors. |
| solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java | Parallelizes incremental backup upload work, makes stats thread-safe, aggregates errors. |
| changelog/unreleased/parallelizebackups.yml | Adds changelog entry for the feature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| authors: | ||
| - name: Samuel Verstraete | ||
| github: elangelo |
There was a problem hiding this comment.
The changelog author metadata uses a github field, but this repository’s changelog format documentation uses nick (optionally with url) under authors. Using an unexpected key may fail changelog validation or omit author info; please switch github: elangelo to nick: elangelo (and add url if desired).
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new SynchronousQueue<>(), | ||
| new SolrNamedThreadFactory("RestoreCore"), | ||
| new ThreadPoolExecutor.CallerRunsPolicy()) |
There was a problem hiding this comment.
Using SynchronousQueue with CallerRunsPolicy means once all maxParallelDownloads threads are busy, additional downloads will execute on the calling thread. That can exceed the configured cap (up to maxParallelDownloads + 1 concurrent transfers) and also bypass the MDCAwareThreadPoolExecutor wrapping for those caller-run tasks. Consider a bounded queue/fixed pool or explicitly limiting in-flight submissions to enforce the configured parallelism.
There was a problem hiding this comment.
The CallerRunsPolicy fallback does mean the submitting thread can run a task when the pool is saturated, but the submitting thread is the Solr request thread — it already carries full MDC context, so there's no MDC loss here. MDCAwareThreadPoolExecutor exists to propagate MDC to new pool threads; the caller-runs case doesn't need that propagation. On the cap concern: the maxParallel* setting is a throughput knob, not a hard safety limit. An occasional N+1 concurrent transfer when the pool is fully busy is negligible for a backup/restore workload.
There was a problem hiding this comment.
Not saying this is wrong and your argument makes sense. Seems like a all of our executors in Solr use a LinkedBlockingQueue which also seems if you do that, we can just set executor to 1 instead of null and avoid all these executor != null checks through this function.
My main problem I can tell from this executor is that it is being created inside this function as a local variable. If I am not mistaken, that means if I do 2 backup/restore calls, I just created 2 thread pools giving us 2x the size. Someone with many collections and backing up with many calls can cause a thread explosion. This should be a global/static executor somewhere else that shares this.
There was a problem hiding this comment.
Actually now that I think of it more the static pool might make this worse since then if set to 1 then all callers fight for 1 thread. So we should definitely use what you are doing here and but don't create the pool in the call, but make it a static pool with ThreadPoolExecutor.CallerRunsPolicy(). Now if the global pool gets saturated, the calling thread takes over instead of creating executors per call.
There was a problem hiding this comment.
if set to 1
Then don't do that :-) It's up to the operator. Perhaps 1 could be sensible if the operator wants to ensure that backups/restores minimally contend with real traffic.
I think most of Solr's thread pools (/executors) are basically singleton pools. Why should this one be different?
There was a problem hiding this comment.
The ExecutorUtil.newMDCAwareCachedThreadPool overloads document a 60 second TTL for the threads, so there won't be any retained overhead 60 seconds beyond the backups/restores completing.
There was a problem hiding this comment.
I have done concurrent backups on a Solr cloud in the past and Solr allows it so we should assume someone does so unless we explicitly reject it. My argument is that in the current implementation, this could result in rapid thread growth and each call needs to manage the executors lifecycle. It is harder to control from a user perspective as it is M * (N + 1) (M = # number of calls, N = thread pool size) threads vs making this a static pool with the CallerRunsPolicy -> N + 1. Also the arg is called which to me implies a global max DEFAULT_MAX_PARALLEL_UPLOADS but right now it is a "max per backup call per collection". What if someone does some backup/restore over a network, and they have bandwidth throttling? It could be harder to control right now.
There was a problem hiding this comment.
Thanks! I see you dropped CallerRunsPolicy. I kind of liked it so that the calling thread can handle some of the backpressure but I can see an argument for not keeping it as it is harder to control. Also when this gets released, since we set the default to 1, backup/restore will be slow if someone upgrades without knowing this change was implemented and the calling thread isn't doing the backup anymore. @dsmiley wdyt?
There was a problem hiding this comment.
I agree on harder to control; lets drop it.
| Throwable cause = e.getCause(); | ||
| // Unwrap RuntimeExceptions that wrap the original IOException | ||
| if (cause instanceof RuntimeException && cause.getCause() != null) { | ||
| firstError = cause.getCause(); | ||
| } else { |
There was a problem hiding this comment.
In the parallel path, the RuntimeException unwrapping (firstError = cause.getCause()) can discard the wrapper message that contains the filename. Prefer preserving that message (or re-wrapping the cause with filename context) when propagating the first failure.
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new SynchronousQueue<>(), | ||
| new SolrNamedThreadFactory("IncrementalBackup"), | ||
| new ThreadPoolExecutor.CallerRunsPolicy()) |
There was a problem hiding this comment.
With SynchronousQueue + CallerRunsPolicy, once maxParallelUploads threads are busy, additional uploads will run on the submitting thread. That can exceed the configured cap (up to maxParallelUploads + 1 concurrent uploads) and bypass MDCAwareThreadPoolExecutor wrapping for caller-run tasks. Consider a bounded queue/fixed pool or explicitly bounding in-flight submissions to enforce the configured limit.
| List<Future<?>> uploadFutures = new ArrayList<>(); | ||
|
|
||
| try { |
There was a problem hiding this comment.
This implementation queues a Future for every index file and holds them in uploadFutures until the end. For large indexes this can create significant memory overhead and delays error reporting. Consider processing completed tasks as they finish (e.g., ExecutorCompletionService) and/or limiting in-flight submissions to maxParallelUploads.
There was a problem hiding this comment.
this really doesn't hold. we need to wait for all futures anyway, so storing them in a list is what we need to do here
| } catch (RuntimeException e) { | ||
| if (e.getCause() instanceof IOException) { | ||
| throw (IOException) e.getCause(); | ||
| } | ||
| throw e; |
There was a problem hiding this comment.
In the synchronous path, rethrowing only e.getCause() (when it’s an IOException) loses the wrapper message that includes the filename ("Failed to process file: ..."). Preserve that per-file context when propagating errors so backup failures are diagnosable.
| } catch (RuntimeException e) { | ||
| if (e.getCause() instanceof IOException) { | ||
| throw (IOException) e.getCause(); | ||
| } | ||
| throw e; |
There was a problem hiding this comment.
In the synchronous path, unwrapping and rethrowing only e.getCause() drops the wrapper message that includes per-file context (filename). Preserve the wrapper message (or re-wrap the cause including the filename) so restore failures remain actionable.
| List<Future<?>> downloadFutures = new ArrayList<>(); | ||
|
|
There was a problem hiding this comment.
This submits one task per index file and retains every Future in downloadFutures until the end. For large collections with many segment files, that can add substantial memory/GC overhead and delays surfacing failures until all tasks are submitted. Consider processing completions incrementally (e.g., ExecutorCompletionService) and/or bounding the number of in-flight tasks to maxParallelDownloads.
| Throwable cause = e.getCause(); | ||
| // Unwrap RuntimeExceptions that wrap the original IOException | ||
| if (cause instanceof RuntimeException && cause.getCause() != null) { | ||
| firstError = cause.getCause(); | ||
| } else { |
There was a problem hiding this comment.
In the parallel join logic, unwrapping RuntimeException to cause.getCause() can discard the wrapper message that includes the filename. Preserve the wrapper message (or re-wrap the underlying IOException with file context) when surfacing the first failure from future.get().
- Replace unsafe IOException cast with `new IOException(msg, cause)` to preserve the original cause chain in IncrementalShardBackup and RestoreCore - Simplify ExecutionException handling by removing unnecessary RuntimeException unwrapping; directly assign `e.getCause()` as the first error - Fix changelog entry: rename `github` field to `nick` for author metadata
|
As far as I can see that test failure is not related to these changes |
mlbiscoc
left a comment
There was a problem hiding this comment.
I pulled this in to test and saw a significant improvement myself. Thanks @elangelo! For a 400GB index that took 20 minutes, with the parallelization, finished in 6 minutes for me. We should look to get this merged.
This PR here looks to be solving https://issues.apache.org/jira/browse/SOLR-17208. Can you update the PR to use this JIRA? CC @gerlowskija
| + | ||
| Maximum number of index files to upload in parallel during backup operations. | ||
| Can also be set via the `SOLR_BACKUP_MAXPARALLELUPLOADS` environment variable. | ||
| For cloud storage repositories (S3, GCS), consider setting this to `8` or higher to improve backup performance. |
There was a problem hiding this comment.
How did you come up with the number 8? Was it just the number you set? Just curious. I would probably not place a number here to recommend as this depends on a persons hardware.
There was a problem hiding this comment.
I landed on 8 as that was where I had the biggest throughput in my tests. Higher and I would see too many retries to the S3 buckets, lower and I was not using the full capacity of target solrcloud cluster. Mind you the 8 is really great for disaster recovery scenario's. If you want to restore to a live solrcloud cluster I would not use 8, as 8 does cause a significant load on the solrcloud cluster. (especially IOPS and bandwidth on the disks).
So I can document all of this of course but I never saw any kind of 'operating sanity defaults' anywhere else in the solrcloud documentation, so I didn't.
There was a problem hiding this comment.
Got it, glad the 8 worked but I still personally don't think we should put 8 here. You also just said don't use 8 on a live cluster as 8 does cause a significant load which is counters your # recommendation. It is a tuning parameter so the the user should figure out for their cloud and hardware not the number we placed on a ref-guide without context of their setup. I think if you want to place a recommendation, it is completely fair to say something along the lines of "Increasing this value can improve backup/restore throughput but too high can result in xyz problems. Recommend to start small and increase based on your Solr clouds observed throughput."
There was a problem hiding this comment.
We could set the thread priority of this pool lower so as to reduce its CPU impact and indirectly then IO. This could be done by adding such an option to SolrNamedThreadFactory in another constructor.
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new SynchronousQueue<>(), | ||
| new SolrNamedThreadFactory("IncrementalBackup"), |
There was a problem hiding this comment.
Maybe suffix it with executor like other threadpools do.
There was a problem hiding this comment.
Do you mean the
| new SolrNamedThreadFactory("IncrementalBackup"), | |
| new SolrNamedThreadFactory("IncrementalBackupExecutor"), |
?
| continue; | ||
| // Only use an executor for parallel uploads when parallelism > 1 | ||
| // When set to 1, run synchronously to avoid thread-local state issues with CallerRunsPolicy | ||
| int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS; |
There was a problem hiding this comment.
Just use DEFAULT_MAX_PARALLEL_UPLOADS. No need to set to new variable and repass to executor
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new SynchronousQueue<>(), | ||
| new SolrNamedThreadFactory("RestoreCore"), | ||
| new ThreadPoolExecutor.CallerRunsPolicy()) |
There was a problem hiding this comment.
Not saying this is wrong and your argument makes sense. Seems like a all of our executors in Solr use a LinkedBlockingQueue which also seems if you do that, we can just set executor to 1 instead of null and avoid all these executor != null checks through this function.
My main problem I can tell from this executor is that it is being created inside this function as a local variable. If I am not mistaken, that means if I do 2 backup/restore calls, I just created 2 thread pools giving us 2x the size. Someone with many collections and backing up with many calls can cause a thread explosion. This should be a global/static executor somewhere else that shares this.
Refactor IncrementalShardBackup and RestoreCore to use shared static ExecutorService instances instead of creating new executor per operation. Simplifies conditional logic for parallel vs sequential operations and avoids thread pool creation overhead. Move executor lifecycle management outside the try-finally block since shared pools should not be shut down.
Revise recommendations for backup and restore parallelism settings to emphasize resource constraints and iterative tuning. Update tip to clarify that global thread pools are shared across concurrent operations on a node.
Add thread name filters for IncrementalBackupExecutor and RestoreCoreExecutor to SolrIgnoredThreadsFilter so that static pool threads are properly excluded from test leak detection.
| firstError = e.getCause(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
If any of them fail, you need to cancel the existing futures here with future.cancel(True) to stop executing the rest of the jobs in the queue. I believe all this does it interrupt the calling thread.
There was a problem hiding this comment.
Right. I'd actually flip the try and for nesting so we don't continue loop iterations on an error condition. Don't bother propagating interruption status to this thread as we're going to end things expeditiously and report the error.
Looking forward to Structured Concurrency some day to make this overall easier/better. We'd have to --enable-preview since it's still incubating :-/
| throw new IOException("Error during parallel backup upload", firstError); | ||
| } | ||
| } | ||
| } finally { |
There was a problem hiding this comment.
Just remove this finally since it does nothing now.
| if (firstError instanceof Error) { | ||
| // Rethrow Errors (like OutOfMemoryError) - don't try to recover | ||
| throw (Error) firstError; | ||
| } else if (firstError instanceof IOException) { | ||
| throw (IOException) firstError; | ||
| } else if (firstError instanceof RuntimeException) { | ||
| throw (RuntimeException) firstError; | ||
| } else if (firstError instanceof InterruptedException) { | ||
| throw new IOException("Backup interrupted", firstError); | ||
| } else { | ||
| throw new IOException("Error during parallel backup upload", firstError); | ||
| } |
There was a problem hiding this comment.
In Java 21 you can use a switch case here
| backupStats.skippedUploadingFile(existedFileCS); | ||
| continue; | ||
| } | ||
| ExecutorService executor = BACKUP_EXECUTOR; |
There was a problem hiding this comment.
Just use BACKUP_EXECUTOR instead of doing this.
| * property {@code solr.backup.maxparalleluploads} or environment variable {@code | ||
| * SOLR_BACKUP_MAXPARALLELUPLOADS}. | ||
| */ | ||
| private static final int DEFAULT_MAX_PARALLEL_UPLOADS = |
There was a problem hiding this comment.
This should just be MAX_PARALLEL_UPLOADS and drop the default prefix
| throw new IOException("Backup interrupted", firstError); | ||
| } else { | ||
| throw new IOException("Error during parallel backup upload", firstError); |
There was a problem hiding this comment.
You threw a SolrException in RestoreCore but throw IOException here.
There was a problem hiding this comment.
At most API layers in Solr, SolrException is best.
| Increasing this value can significantly improve restore throughput when using cloud storage (S3, GCS), but too high a value will increase IOPS and bandwidth pressure on your cluster. | ||
| Start small and increase based on observed throughput and available resources. | ||
|
|
||
| TIP: Both settings share a single global thread pool per property, so the configured limit applies across all concurrent backup or restore operations on the node. |
There was a problem hiding this comment.
I misread this the first time as both backup restore share a single pool which contradicted the settings. Can you rewrite this to be clearly state that these are 2 separate thread pools
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new SynchronousQueue<>(), | ||
| new SolrNamedThreadFactory("RestoreCore"), | ||
| new ThreadPoolExecutor.CallerRunsPolicy()) |
There was a problem hiding this comment.
Thanks! I see you dropped CallerRunsPolicy. I kind of liked it so that the calling thread can handle some of the backpressure but I can see an argument for not keeping it as it is harder to control. Also when this gets released, since we set the default to 1, backup/restore will be slow if someone upgrades without knowing this change was implemented and the calling thread isn't doing the backup anymore. @dsmiley wdyt?
| Increasing this value can significantly improve backup throughput when using cloud storage (S3, GCS), but too high a value will increase IOPS and bandwidth pressure on your cluster. | ||
| Start small and increase based on observed throughput and available resources. |
There was a problem hiding this comment.
This is basically written twice. Maybe just place it below once that references it is talking about both of these properties
| Runnable uploadTask = | ||
| () -> { | ||
| try { | ||
| // Calculate checksum and check if file already exists in previous backup | ||
| Optional<ShardBackupMetadata.BackedFile> opBackedFile = | ||
| oldBackupPoint.getFile(fileNameFinal); | ||
| Checksum originalFileCS = backupRepo.checksum(dir, fileNameFinal); | ||
|
|
||
| if (opBackedFile.isPresent()) { | ||
| ShardBackupMetadata.BackedFile backedFile = opBackedFile.get(); | ||
| Checksum existedFileCS = backedFile.fileChecksum; | ||
| if (existedFileCS.equals(originalFileCS)) { | ||
| synchronized (currentBackupPoint) { | ||
| currentBackupPoint.addBackedFile(opBackedFile.get()); | ||
| } | ||
| backupStats.skippedUploadingFile(existedFileCS); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| // File doesn't exist or has changed - upload it | ||
| String backedFileName = UUID.randomUUID().toString(); | ||
| backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, backedFileName); | ||
|
|
||
| synchronized (currentBackupPoint) { | ||
| currentBackupPoint.addBackedFile(backedFileName, fileNameFinal, originalFileCS); | ||
| } | ||
| backupStats.uploadedFile(originalFileCS); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to process file: " + fileNameFinal, e); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Nit: For these lambdas, can we put them in a private method?
|
|
||
| private static final ExecutorService BACKUP_EXECUTOR = | ||
| ExecutorUtil.newMDCAwareCachedThreadPool( | ||
| Math.max(1, DEFAULT_MAX_PARALLEL_UPLOADS), |
There was a problem hiding this comment.
I wouldn't bother with the 'max'; someone configuring this should know what they are doing. Heck if they put -1 thinking it might disable, they'll now probably get the error they should get.
| private static final int DEFAULT_MAX_PARALLEL_UPLOADS = | ||
| EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1); | ||
|
|
||
| private static final ExecutorService BACKUP_EXECUTOR = |
There was a problem hiding this comment.
as this is used for "uploads" (in the context of backups) and not for backups themselves, I think the name should reflect that, like BACKUP_UPLOAD_EXECUTOR (and similarly for the thread name)
| firstError = e.getCause(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Right. I'd actually flip the try and for nesting so we don't continue loop iterations on an error condition. Don't bother propagating interruption status to this thread as we're going to end things expeditiously and report the error.
Looking forward to Structured Concurrency some day to make this overall easier/better. We'd have to --enable-preview since it's still incubating :-/
|
|
||
| // Wait for ALL futures to ensure all files are processed | ||
| Throwable firstError = null; | ||
| for (Future<?> future : downloadFutures) { |
There was a problem hiding this comment.
same code-review feedback applies this this class
| throw new IOException("Backup interrupted", firstError); | ||
| } else { | ||
| throw new IOException("Error during parallel backup upload", firstError); |
There was a problem hiding this comment.
At most API layers in Solr, SolrException is best.
| } | ||
|
|
||
| // Static backup/restore thread pools - stateless, no core references, threads expire on idle | ||
| if (threadName.startsWith("IncrementalBackupExecutor-") |
There was a problem hiding this comment.
This should be touched sparingly. We care a lot about cleaning up resources completely between tests, so as long as a CoreContainer shuts down, all the ExecutorServices inside had better shut down completely as part of the shutdown sequence.
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new SynchronousQueue<>(), | ||
| new SolrNamedThreadFactory("RestoreCore"), | ||
| new ThreadPoolExecutor.CallerRunsPolicy()) |
There was a problem hiding this comment.
I agree on harder to control; lets drop it.
Description
This PR ensures multiple threads are used to create backups and to restore backups. This ensures a considerate speedup when using cloud storage such as S3.
For comparison a backup to s3 of 1.8TiB takes roughly 16 minutes with this code. a 340GiB collection on the old code takes roughly 50 minutes.
Restoring the same collection took 7 minutes instead of 1 hour and 20 minutes (on a 6 node cluster)
Solution
As the previous implementation already had a loop over all files that needed to be backed up to the backup repository I simply wrapped that in a ThreadPool Executor
Tests
I have ran this code locally on a solrcloud cluster
Checklist
Please review the following and check all that apply:
mainbranch../gradlew check.