Skip to content

Commit 44541fc

Browse files
vbabaninstIncMalerozza
authored
Add support for server selection's deprioritized servers (#1860)
- Deprioritize sharded clusters on any error, all other topologies only on SystemOverloadedError. - Pass ClusterType to updateCandidate so onAttemptFailure can distinguish topology types. - Add retryable reads prose tests 3.1 and 3.2. - Change ServerSelectionSelectionTest to use BaseCluster server selection chain. JAVA-6105 JAVA-6021 JAVA-6074 --------- Co-authored-by: Valentin Kovalenko <[email protected]> Co-authored-by: Ross Lawley <[email protected]>
1 parent ffe5242 commit 44541fc

87 files changed

Lines changed: 1217 additions & 758 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ abstract class BaseCluster implements Cluster {
113113
private volatile ClusterDescription description;
114114

115115
BaseCluster(final ClusterId clusterId,
116-
final ClusterSettings settings,
117-
final ClusterableServerFactory serverFactory,
118-
final ClientMetadata clientMetadata) {
116+
final ClusterSettings settings,
117+
final ClusterableServerFactory serverFactory,
118+
final ClientMetadata clientMetadata) {
119119
this.clusterId = notNull("clusterId", clusterId);
120120
this.settings = notNull("settings", settings);
121121
this.serverFactory = notNull("serverFactory", serverFactory);
@@ -159,7 +159,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
159159
if (serverTuple != null) {
160160
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
161161
logServerSelectionSucceeded(operationContext, clusterId, serverAddress, serverSelector, currentDescription);
162-
serverDeprioritization.updateCandidate(serverAddress);
162+
serverDeprioritization.updateCandidate(serverAddress, currentDescription.getType());
163163
return serverTuple;
164164
}
165165
computedServerSelectionTimeout.onExpired(() ->
@@ -302,7 +302,7 @@ private boolean handleServerSelectionRequest(
302302
if (serverTuple != null) {
303303
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
304304
logServerSelectionSucceeded(operationContext, clusterId, serverAddress, request.originalSelector, description);
305-
serverDeprioritization.updateCandidate(serverAddress);
305+
serverDeprioritization.updateCandidate(serverAddress, description.getType());
306306
request.onResult(serverTuple, null);
307307
return true;
308308
}
@@ -361,8 +361,7 @@ private static ServerSelector getCompleteServerSelector(
361361
final ClusterSettings settings) {
362362
List<ServerSelector> selectors = Stream.of(
363363
getRaceConditionPreFilteringSelector(serversSnapshot),
364-
serverSelector,
365-
serverDeprioritization.getServerSelector(),
364+
serverDeprioritization.apply(serverSelector),
366365
settings.getServerSelector(), // may be null
367366
new LatencyMinimizingServerSelector(settings.getLocalThreshold(MILLISECONDS), MILLISECONDS),
368367
AtMostTwoRandomServerSelector.instance(),

driver-core/src/main/com/mongodb/internal/connection/OperationContext.java

Lines changed: 59 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.mongodb.Function;
1919
import com.mongodb.MongoConnectionPoolClearedException;
20+
import com.mongodb.MongoException;
2021
import com.mongodb.ReadConcern;
2122
import com.mongodb.RequestContext;
2223
import com.mongodb.ServerAddress;
@@ -27,7 +28,6 @@
2728
import com.mongodb.internal.IgnorableRequestContext;
2829
import com.mongodb.internal.TimeoutContext;
2930
import com.mongodb.internal.TimeoutSettings;
30-
import com.mongodb.internal.VisibleForTesting;
3131
import com.mongodb.internal.observability.micrometer.Span;
3232
import com.mongodb.internal.observability.micrometer.TracingManager;
3333
import com.mongodb.internal.session.SessionContext;
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.atomic.AtomicLong;
4242

43+
import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL;
4344
import static java.util.stream.Collectors.toList;
4445

4546
/**
@@ -76,7 +77,7 @@ public OperationContext(final RequestContext requestContext, final SessionContex
7677
null);
7778
}
7879

79-
public static OperationContext simpleOperationContext(
80+
static OperationContext simpleOperationContext(
8081
final TimeoutSettings timeoutSettings, @Nullable final ServerApi serverApi) {
8182
return new OperationContext(
8283
IgnorableRequestContext.INSTANCE,
@@ -113,6 +114,15 @@ public OperationContext withOperationName(final String operationName) {
113114
operationName, tracingSpan);
114115
}
115116

117+
/**
118+
* TODO-JAVA-6058: This method enables overriding the ServerDeprioritization state.
119+
* It is a temporary solution to handle cases where deprioritization state persists across operations.
120+
*/
121+
public OperationContext withNewServerDeprioritization() {
122+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), tracingManager, serverApi,
123+
operationName, tracingSpan);
124+
}
125+
116126
public long getId() {
117127
return id;
118128
}
@@ -152,8 +162,7 @@ public void setTracingSpan(final Span tracingSpan) {
152162
this.tracingSpan = tracingSpan;
153163
}
154164

155-
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
156-
public OperationContext(final long id,
165+
private OperationContext(final long id,
157166
final RequestContext requestContext,
158167
final SessionContext sessionContext,
159168
final TimeoutContext timeoutContext,
@@ -174,26 +183,6 @@ public OperationContext(final long id,
174183
this.tracingSpan = tracingSpan;
175184
}
176185

177-
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
178-
public OperationContext(final long id,
179-
final RequestContext requestContext,
180-
final SessionContext sessionContext,
181-
final TimeoutContext timeoutContext,
182-
final TracingManager tracingManager,
183-
@Nullable final ServerApi serverApi,
184-
@Nullable final String operationName) {
185-
this.id = id;
186-
this.serverDeprioritization = new ServerDeprioritization();
187-
this.requestContext = requestContext;
188-
this.sessionContext = sessionContext;
189-
this.timeoutContext = timeoutContext;
190-
this.tracingManager = tracingManager;
191-
this.serverApi = serverApi;
192-
this.operationName = operationName;
193-
this.tracingSpan = null;
194-
}
195-
196-
197186
/**
198187
* @return The same {@link ServerDeprioritization} if called on the same {@link OperationContext}.
199188
*/
@@ -227,34 +216,46 @@ public OperationContext withOverride(final TimeoutContextOverride timeoutContext
227216
public static final class ServerDeprioritization {
228217
@Nullable
229218
private ServerAddress candidate;
219+
@Nullable
220+
private ClusterType clusterType;
230221
private final Set<ServerAddress> deprioritized;
231-
private final DeprioritizingSelector selector;
232222

233223
private ServerDeprioritization() {
234224
candidate = null;
235225
deprioritized = new HashSet<>();
236-
selector = new DeprioritizingSelector();
226+
clusterType = null;
237227
}
238228

239229
/**
240-
* The returned {@link ServerSelector} tries to {@linkplain ServerSelector#select(ClusterDescription) select}
241-
* only the {@link ServerDescription}s that do not have deprioritized {@link ServerAddress}es.
242-
* If no such {@link ServerDescription} can be selected, then it selects {@link ClusterDescription#getServerDescriptions()}.
230+
* The returned {@link ServerSelector} wraps the provided selector and attempts
231+
* {@linkplain ServerSelector#select(ClusterDescription) server selection} in two passes:
232+
* <ol>
233+
* <li>First pass: selects using the wrapped selector with only non-deprioritized {@link ServerDescription}s.</li>
234+
* <li>Second pass: if the first pass selects no {@link ServerDescription}s,
235+
* selects using the wrapped selector again with all {@link ServerDescription}s, including deprioritized ones.</li>
236+
* </ol>
243237
*/
244-
ServerSelector getServerSelector() {
245-
return selector;
238+
ServerSelector apply(final ServerSelector wrappedSelector) {
239+
return new DeprioritizingSelector(wrappedSelector);
246240
}
247241

248-
void updateCandidate(final ServerAddress serverAddress) {
249-
candidate = serverAddress;
242+
void updateCandidate(final ServerAddress serverAddress, final ClusterType clusterType) {
243+
this.candidate = serverAddress;
244+
this.clusterType = clusterType;
250245
}
251246

252247
public void onAttemptFailure(final Throwable failure) {
253248
if (candidate == null || failure instanceof MongoConnectionPoolClearedException) {
254249
candidate = null;
255250
return;
256251
}
257-
deprioritized.add(candidate);
252+
253+
// As per spec: sharded clusters deprioritize on any error, other topologies only on overload
254+
boolean isSystemOverloadedError = failure instanceof MongoException
255+
&& ((MongoException) failure).hasErrorLabel(SYSTEM_OVERLOADED_ERROR_LABEL);
256+
if (clusterType == ClusterType.SHARDED || isSystemOverloadedError) {
257+
deprioritized.add(candidate);
258+
}
258259
}
259260

260261
/**
@@ -263,24 +264,41 @@ public void onAttemptFailure(final Throwable failure) {
263264
* which indeed may be used concurrently. {@link DeprioritizingSelector} does not need to be thread-safe.
264265
*/
265266
private final class DeprioritizingSelector implements ServerSelector {
266-
private DeprioritizingSelector() {
267+
private final ServerSelector wrappedSelector;
268+
269+
private DeprioritizingSelector(final ServerSelector wrappedSelector) {
270+
this.wrappedSelector = wrappedSelector;
267271
}
268272

269273
@Override
270274
public List<ServerDescription> select(final ClusterDescription clusterDescription) {
271275
List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions();
272-
if (!isEnabled(clusterDescription.getType())) {
273-
return serverDescriptions;
276+
277+
// TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection.
278+
if (serverDescriptions.size() == 1 || deprioritized.isEmpty()) {
279+
return wrappedSelector.select(clusterDescription);
274280
}
281+
282+
// TODO-JAVA-5908: Evaluate whether using a loop instead of Stream has a meaningful performance impact on server selection.
275283
List<ServerDescription> nonDeprioritizedServerDescriptions = serverDescriptions
276284
.stream()
277285
.filter(serverDescription -> !deprioritized.contains(serverDescription.getAddress()))
278286
.collect(toList());
279-
return nonDeprioritizedServerDescriptions.isEmpty() ? serverDescriptions : nonDeprioritizedServerDescriptions;
280-
}
281287

282-
private boolean isEnabled(final ClusterType clusterType) {
283-
return clusterType == ClusterType.SHARDED;
288+
// TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection.
289+
if (nonDeprioritizedServerDescriptions.isEmpty()) {
290+
return wrappedSelector.select(clusterDescription);
291+
}
292+
293+
List<ServerDescription> selected = wrappedSelector.select(
294+
new ClusterDescription(
295+
clusterDescription.getConnectionMode(),
296+
clusterDescription.getType(),
297+
clusterDescription.getSrvResolutionException(),
298+
nonDeprioritizedServerDescriptions,
299+
clusterDescription.getClusterSettings(),
300+
clusterDescription.getServerSettings()));
301+
return selected.isEmpty() ? wrappedSelector.select(clusterDescription) : selected;
284302
}
285303
}
286304
}

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
7373
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
7474
this.binding = binding;
7575
binding.retain();
76-
this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride);
76+
this.initialOperationContext = operationContext
77+
.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride)
78+
.withNewServerDeprioritization();
7779
this.resumeToken = resumeToken;
7880
this.maxWireVersion = maxWireVersion;
7981
isClosed = new AtomicBoolean();

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T
8585
final int maxWireVersion) {
8686
this.changeStreamOperation = changeStreamOperation;
8787
this.binding = binding.retain();
88-
this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride);
88+
this.initialOperationContext = operationContext
89+
.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride)
90+
.withNewServerDeprioritization();
8991
this.wrapped = wrapped;
9092
this.resumeToken = resumeToken;
9193
this.maxWireVersion = maxWireVersion;

0 commit comments

Comments
 (0)