|
18 | 18 |
|
19 | 19 | import com.mongodb.ClusterFixture; |
20 | 20 | import com.mongodb.MongoClientSettings; |
| 21 | +import com.mongodb.event.ConnectionCheckOutFailedEvent; |
21 | 22 | import com.mongodb.event.ConnectionPoolClearedEvent; |
22 | 23 | import com.mongodb.event.ConnectionPoolListener; |
23 | 24 | import com.mongodb.event.ConnectionPoolReadyEvent; |
|
47 | 48 | import java.util.Set; |
48 | 49 | import java.util.concurrent.BlockingQueue; |
49 | 50 | import java.util.concurrent.CountDownLatch; |
| 51 | +import java.util.concurrent.ExecutorService; |
| 52 | +import java.util.concurrent.Executors; |
50 | 53 | import java.util.concurrent.LinkedBlockingQueue; |
| 54 | +import java.util.concurrent.atomic.AtomicInteger; |
51 | 55 |
|
52 | 56 | import static com.mongodb.ClusterFixture.configureFailPoint; |
53 | 57 | import static com.mongodb.ClusterFixture.disableFailPoint; |
@@ -268,6 +272,80 @@ public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() { |
268 | 272 | // As it requires mocking and package access to `com.mongodb.internal.connection` |
269 | 273 | } |
270 | 274 |
|
| 275 | + /** |
| 276 | + * See |
| 277 | + * <a href="https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.md#connection-pool-backpressure">Connection Pool Backpressure</a>. |
| 278 | + */ |
| 279 | + @Test |
| 280 | + public void testConnectionPoolBackpressure() throws InterruptedException { |
| 281 | + assumeTrue(serverVersionAtLeast(7, 0)); |
| 282 | + |
| 283 | + AtomicInteger connectionCheckOutFailedEventCount = new AtomicInteger(0); |
| 284 | + AtomicInteger poolClearedEventCount = new AtomicInteger(0); |
| 285 | + |
| 286 | + ConnectionPoolListener connectionPoolListener = new ConnectionPoolListener() { |
| 287 | + @Override |
| 288 | + public void connectionCheckOutFailed(final ConnectionCheckOutFailedEvent event) { |
| 289 | + connectionCheckOutFailedEventCount.incrementAndGet(); |
| 290 | + } |
| 291 | + |
| 292 | + @Override |
| 293 | + public void connectionPoolCleared(final ConnectionPoolClearedEvent event) { |
| 294 | + poolClearedEventCount.incrementAndGet(); |
| 295 | + } |
| 296 | + }; |
| 297 | + |
| 298 | + MongoClientSettings clientSettings = getMongoClientSettingsBuilder() |
| 299 | + .applyToConnectionPoolSettings(builder -> builder |
| 300 | + .maxConnecting(100) |
| 301 | + .addConnectionPoolListener(connectionPoolListener)) |
| 302 | + .build(); |
| 303 | + |
| 304 | + try (MongoClient adminClient = MongoClients.create(getMongoClientSettingsBuilder().build()); |
| 305 | + MongoClient client = MongoClients.create(clientSettings)) { |
| 306 | + |
| 307 | + MongoDatabase adminDatabase = adminClient.getDatabase("admin"); |
| 308 | + MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); |
| 309 | + MongoCollection<Document> collection = database.getCollection("testCollection"); |
| 310 | + |
| 311 | + // Configure rate limiter using admin commands |
| 312 | + adminDatabase.runCommand(new Document("setParameter", 1) |
| 313 | + .append("ingressConnectionEstablishmentRateLimiterEnabled", true)); |
| 314 | + adminDatabase.runCommand(new Document("setParameter", 1) |
| 315 | + .append("ingressConnectionEstablishmentRatePerSec", 20)); |
| 316 | + adminDatabase.runCommand(new Document("setParameter", 1) |
| 317 | + .append("ingressConnectionEstablishmentBurstCapacitySecs", 1)); |
| 318 | + adminDatabase.runCommand(new Document("setParameter", 1) |
| 319 | + .append("ingressConnectionEstablishmentMaxQueueDepth", 1)); |
| 320 | + |
| 321 | + // Add a document to the collection |
| 322 | + collection.insertOne(Document.parse("{}"));// change |
| 323 | + |
| 324 | + // Run 100 parallel find operations with 2-seconds sleep |
| 325 | + ExecutorService executor = Executors.newFixedThreadPool(100); |
| 326 | + for (int i = 0; i < 100; i++) { |
| 327 | + executor.submit(() -> collection.find(new Document("$where", "function() { sleep(2000); return true; }")).first()); |
| 328 | + } |
| 329 | + |
| 330 | + // Wait for all operations to complete (max 10 seconds) |
| 331 | + executor.shutdown(); |
| 332 | + boolean terminated = executor.awaitTermination(10, SECONDS); |
| 333 | + assertTrue("Executor did not terminate within timeout", terminated); |
| 334 | + |
| 335 | + // Assert at least 10 ConnectionCheckOutFailedEvents occurred |
| 336 | + assertTrue("Expected at least 10 ConnectionCheckOutFailedEvents, but got " + connectionCheckOutFailedEventCount.get(), |
| 337 | + connectionCheckOutFailedEventCount.get() >= 10); |
| 338 | + |
| 339 | + // Assert 0 PoolClearedEvents occurred |
| 340 | + assertEquals("Expected 0 PoolClearedEvents", 0, poolClearedEventCount.get()); |
| 341 | + |
| 342 | + // Teardown: sleep 1 second and reset rate limiter |
| 343 | + Thread.sleep(1000); |
| 344 | + adminDatabase.runCommand(new Document("setParameter", 1) |
| 345 | + .append("ingressConnectionEstablishmentRateLimiterEnabled", false)); |
| 346 | + } |
| 347 | + } |
| 348 | + |
271 | 349 | private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Class<?> allowed, final Set<Class<?>> required) |
272 | 350 | throws InterruptedException { |
273 | 351 | assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED)); |
|
0 commit comments