[flink] introduce batched splits assignment mechanism#3288
Open
zuston wants to merge 2 commits intoapache:mainfrom
Open
[flink] introduce batched splits assignment mechanism#3288zuston wants to merge 2 commits intoapache:mainfrom
zuston wants to merge 2 commits intoapache:mainfrom
Conversation
Contributor
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a per-reader batched split assignment mechanism in the Flink source enumerator to reduce the risk of exceeding Flink RPC message size limits when a reader is assigned a very large number of splits.
Changes:
- Add a new connector option
scan.split.assignment.batch-sizeand plumb it through builder/table sources intoFlinkSourceEnumerator. - Update
FlinkSourceEnumeratorto partition per-reader split assignments into multipleassignSplits(...)calls. - Extend/adjust enumerator unit tests to account for batched assignment sequences and add validation coverage for invalid batch size.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java | Adds tests for batched assignment behavior and invalid batch size; updates assignment assertions to aggregate multiple assignment events. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java | Adds builder support for configuring split-assignment batch size and applies defaulting. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java | Threads batch-size into FlinkSource construction for DataStream usage. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java | Threads batch-size through table source construction and copy methods. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java | Adds/extends constructors and propagates batch-size into the enumerator. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java | Implements the actual per-reader batching in split assignment and validates the batch-size argument. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java | Threads batch-size into changelog table source runtime provider and copy. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java | Threads batch-size into binlog table source runtime provider and copy. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java | Introduces the new scan.split.assignment.batch-size config option. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java | Reads the new option from table config, passes it into created sources, and exposes it as an optional option. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+146
to
+149
| public FlussSourceBuilder<OUT> setSplitPerAssignmentBatchSize(int splitPerAssignmentBatchSize) { | ||
| this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; | ||
| return this; | ||
| } |
Comment on lines
145
to
151
| long partitionDiscoveryIntervalMs = | ||
| tableOptions | ||
| .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) | ||
| .toMillis(); | ||
| int splitAssignmentBatchSize = | ||
| tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); | ||
|
|
Comment on lines
+153
to
+160
| public static final ConfigOption<Integer> SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE = | ||
| ConfigOptions.key("scan.split.assignment.batch-size") | ||
| .intType() | ||
| .defaultValue(Integer.MAX_VALUE) | ||
| .withDescription( | ||
| "The maximum number of Fluss source splits assigned to a reader in " | ||
| + "one assignment request. The value must be positive. By default, " | ||
| + "all pending splits for a reader are assigned in one request."); |
| @@ -927,7 +1043,19 @@ private void assignPendingSplits(Set<Integer> pendingReaders) { | |||
| // Assign pending splits to readers | |||
| if (!incrementalAssignment.isEmpty()) { | |||
| LOG.info("Assigning splits to readers {}", incrementalAssignment); | |||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #3287
This PR introduces per-subtask batched split assignment to avoid RPC payloads exceeding size limits.
Brief change log
Tests
API and Format
Documentation