Skip to content

Commit 4f95659

Browse files
committed
Fix flaky mongos test.
1 parent d478862 commit 4f95659

4 files changed

Lines changed: 39 additions & 21 deletions

File tree

driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import com.mongodb.connection.ServerDescription;
20+
import com.mongodb.connection.ServerType;
1921
import com.mongodb.event.ClusterClosedEvent;
2022
import com.mongodb.event.ClusterDescriptionChangedEvent;
2123
import com.mongodb.event.ClusterListener;
@@ -115,6 +117,14 @@ public void waitForClusterDescriptionChangedEvents(
115117
}
116118
}
117119

120+
public void waitForAllServersDiscovered(final Duration duration) throws InterruptedException, TimeoutException {
121+
waitForClusterDescriptionChangedEvents(
122+
event -> event.getNewDescription().getServerDescriptions().stream()
123+
.map(ServerDescription::getType)
124+
.noneMatch(ServerType.UNKNOWN::equals),
125+
1, duration);
126+
}
127+
118128
/**
119129
* Waits for the cluster to be closed, which is signaled by a {@link ClusterClosedEvent}.
120130
*/

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.junit.jupiter.api.Disabled;
2222
import org.junit.jupiter.api.Test;
2323

24+
import java.util.concurrent.TimeoutException;
25+
2426
/**
2527
* <a href="https://github.com/mongodb/specifications/blob/master/source/retryable-writes/tests/README.md#prose-tests">
2628
* Prose Tests</a>.
@@ -52,7 +54,7 @@ void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception {
5254
* 4. Test that in a sharded cluster writes are retried on a different mongos when one is available</a>.
5355
*/
5456
@Test
55-
void retriesOnDifferentMongosWhenAvailable() {
57+
void retriesOnDifferentMongosWhenAvailable() throws InterruptedException, TimeoutException {
5658
com.mongodb.client.RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable(
5759
SyncMongoClient::new,
5860
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);

driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsProseTest.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.ServerAddress;
2222
import com.mongodb.client.test.CollectionHelper;
23-
import com.mongodb.connection.ClusterDescription;
2423
import com.mongodb.event.CommandFailedEvent;
2524
import com.mongodb.event.CommandSucceededEvent;
2625
import com.mongodb.internal.connection.TestClusterListener;
@@ -85,7 +84,7 @@ void poolClearedExceptionMustBeRetryable() throws Exception {
8584
* 2.1 Retryable Reads Are Retried on a Different mongos When One is Available</a>.
8685
*/
8786
@Test
88-
void retriesOnDifferentMongosWhenAvailable() {
87+
void retriesOnDifferentMongosWhenAvailable() throws InterruptedException, TimeoutException {
8988
RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable(this::createClient,
9089
mongoCollection -> {
9190
try (MongoCursor<Document> cursor = mongoCollection.find().iterator()) {
@@ -236,20 +235,14 @@ private void testRetriedOnTheSameServer(final BsonDocument configureFailPoint) t
236235
}
237236

238237
private void waitForClusterDiscovery() throws InterruptedException, TimeoutException {
239-
clusterListener.waitForClusterDescriptionChangedEvents(
240-
event -> {
241-
ClusterDescription desc = event.getNewDescription();
242-
// We need both primary and secondary to be discovered (not UNKNOWN) before running the deprioritization tests.
243-
//
244-
// 1. The failpoint is set on the primary. If the primary is not yet discovered,
245-
// primaryPreferred may route the find to a secondary, and the failpoint never fires.
246-
//
247-
// 2. When the primary is deprioritized on retry, primaryPreferred falls back to a secondary.
248-
// If the secondaries are still UNKNOWN at that point, the fallback yields no selectable servers,
249-
// causing the deprioritized primary to be selected again.
250-
return desc.hasReadableServer(ReadPreference.primary())
251-
&& desc.hasReadableServer(ReadPreference.secondary());
252-
},
253-
1, Duration.ofSeconds(10));
238+
// We need both primary and secondary to be discovered (not UNKNOWN) before running the deprioritization tests.
239+
//
240+
// 1. The failpoint is set on the primary. If the primary is not yet discovered,
241+
// primaryPreferred may route the find to a secondary, and the failpoint never fires.
242+
//
243+
// 2. When the primary is deprioritized on retry, primaryPreferred falls back to a secondary.
244+
// If the secondaries are still UNKNOWN at that point, the fallback yields no selectable servers,
245+
// causing the deprioritized primary to be selected again.
246+
clusterListener.waitForAllServersDiscovered(Duration.ofSeconds(10));
254247
}
255248
}

driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.mongodb.event.ConnectionCheckedOutEvent;
3333
import com.mongodb.event.ConnectionPoolClearedEvent;
3434
import com.mongodb.internal.connection.ServerAddressHelper;
35+
import com.mongodb.internal.connection.TestClusterListener;
3536
import com.mongodb.internal.connection.TestCommandListener;
3637
import com.mongodb.internal.connection.TestConnectionPoolListener;
3738
import com.mongodb.internal.event.ConfigureFailPointCommandListener;
@@ -42,13 +43,15 @@
4243
import org.junit.jupiter.api.Disabled;
4344
import org.junit.jupiter.api.Test;
4445

46+
import java.time.Duration;
4547
import java.util.HashSet;
4648
import java.util.List;
4749
import java.util.Set;
4850
import java.util.concurrent.ExecutorService;
4951
import java.util.concurrent.Executors;
5052
import java.util.concurrent.Future;
5153
import java.util.concurrent.TimeUnit;
54+
import java.util.concurrent.TimeoutException;
5255
import java.util.function.Predicate;
5356
import java.util.stream.Collectors;
5457

@@ -224,15 +227,16 @@ public static void originalErrorMustBePropagatedIfNoWritesPerformed(
224227
* 4. Test that in a sharded cluster writes are retried on a different mongos when one is available</a>.
225228
*/
226229
@Test
227-
void retriesOnDifferentMongosWhenAvailable() {
230+
void retriesOnDifferentMongosWhenAvailable() throws InterruptedException, TimeoutException {
228231
retriesOnDifferentMongosWhenAvailable(MongoClients::create,
229232
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
230233
}
231234

232235
@SuppressWarnings("try")
233236
public static <R> void retriesOnDifferentMongosWhenAvailable(
234237
final Function<MongoClientSettings, MongoClient> clientCreator,
235-
final Function<MongoCollection<Document>, R> operation, final String expectedCommandName, final boolean write) {
238+
final Function<MongoCollection<Document>, R> operation, final String expectedCommandName, final boolean write)
239+
throws InterruptedException, TimeoutException {
236240
if (write) {
237241
assumeTrue(serverVersionAtLeast(4, 4));
238242
}
@@ -253,15 +257,24 @@ public static <R> void retriesOnDifferentMongosWhenAvailable(
253257
+ " }\n"
254258
+ "}\n");
255259
TestCommandListener commandListener = new TestCommandListener(singletonList("commandFailedEvent"), emptyList());
260+
TestClusterListener clusterListener = new TestClusterListener();
256261
try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address);
257262
FailPoint s1FailPoint = FailPoint.enable(configureFailPoint, s1Address);
258263
MongoClient client = clientCreator.apply(getMultiMongosMongoClientSettingsBuilder()
259264
.retryReads(true)
260265
.retryWrites(true)
261266
.addCommandListener(commandListener)
262267
// explicitly specify only s0 and s1, in case `getMultiMongosMongoClientSettingsBuilder` has more
263-
.applyToClusterSettings(builder -> builder.hosts(asList(s0Address, s1Address)))
268+
.applyToClusterSettings(builder -> builder
269+
.hosts(asList(s0Address, s1Address))
270+
.addClusterListener(clusterListener))
264271
.build())) {
272+
// We need both mongos servers to be discovered (not UNKNOWN) before running the deprioritization test.
273+
// When the first mongos is deprioritized on retry, the selector falls back to the second mongos.
274+
// If the second mongos is still UNKNOWN at that point, the non-deprioritized pass yields no selectable servers,
275+
// causing the deprioritized mongos to be selected again.
276+
clusterListener.waitForAllServersDiscovered(Duration.ofSeconds(10));
277+
265278
MongoCollection<Document> collection = dropAndGetCollection("retriesOnDifferentMongosWhenAvailable", client);
266279
commandListener.reset();
267280
assertThrows(MongoServerException.class, () -> operation.apply(collection));

0 commit comments

Comments
 (0)