Skip to content

Commit 895b339

Browse files
committed
Retry KMS requests on transient errors
Add libmongocrypt CAPI bindings for KMS retry support and wire retry logic through the sync and reactive driver stacks. Transient KMS HTTP and network errors are retried with backoff delays managed by libmongocrypt; retry is enabled unconditionally. - Add native bindings: mongocrypt_setopt_retry_kms, mongocrypt_kms_ctx_usleep, mongocrypt_kms_ctx_feed_with_retry, mongocrypt_kms_ctx_fail - Add sleepMicroseconds(), feedAndRetry(), fail() to MongoKeyDecryptor - Enable KMS retry unconditionally in MongoCryptImpl - Rewrite sync Crypt.decryptKey() with retry loop, timeout-aware - Add retry logic to reactive KeyManagementService.decryptKey() - Fix TlsChannelImpl.read() to preserve bytes delivered alongside close_notify (already fixed upstream in marianobarrios/tls-channel) - Add spec Section 24 KMS retry integration tests (sync + reactive) - Add Evergreen CI task for KMS retry tests JAVA-5391
1 parent e9c0c4b commit 895b339

12 files changed

Lines changed: 668 additions & 27 deletions

File tree

.evergreen/.evg.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,16 @@ functions:
764764
set +o xtrace
765765
MONGODB_URI="${MONGODB_URI}" KMS_TLS_ERROR_TYPE=${KMS_TLS_ERROR_TYPE} .evergreen/run-kms-tls-tests.sh
766766
767+
"run-kms-retry-test":
768+
- command: shell.exec
769+
type: "test"
770+
params:
771+
working_dir: "src"
772+
script: |
773+
${PREPARE_SHELL}
774+
set +o xtrace
775+
MONGODB_URI="${MONGODB_URI}" .evergreen/run-kms-retry-tests.sh
776+
767777
"run-csfle-aws-from-environment-test":
768778
- command: shell.exec
769779
type: "test"
@@ -1632,6 +1642,17 @@ tasks:
16321642
AUTH: "noauth"
16331643
SSL: "nossl"
16341644

1645+
- name: "test-kms-retry-task"
1646+
tags: [ "kms-retry" ]
1647+
commands:
1648+
- func: "start-mongo-orchestration"
1649+
vars:
1650+
TOPOLOGY: "server"
1651+
AUTH: "noauth"
1652+
SSL: "nossl"
1653+
- func: "start-csfle-servers"
1654+
- func: "run-kms-retry-test"
1655+
16351656
- name: "test-csfle-aws-from-environment-task"
16361657
tags: [ "csfle-aws-from-environment" ]
16371658
commands:
@@ -2528,6 +2549,12 @@ buildvariants:
25282549
tasks:
25292550
- name: ".kms-tls"
25302551

2552+
- matrix_name: "kms-retry-test"
2553+
matrix_spec: { os: "linux", version: [ "5.0" ], topology: [ "standalone" ] }
2554+
display_name: "CSFLE KMS Retry"
2555+
tasks:
2556+
- name: ".kms-retry"
2557+
25312558
- matrix_name: "csfle-aws-from-environment-test"
25322559
matrix_spec: { os: "linux", version: [ "5.0" ], topology: [ "standalone" ] }
25332560
display_name: "CSFLE AWS From Environment"

.evergreen/run-kms-retry-tests.sh

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#!/bin/bash
2+
3+
# Don't trace since the URI contains a password that shouldn't show up in the logs
4+
set -o errexit # Exit the script with error if any of the commands fail
5+
6+
# Supported/used environment variables:
7+
# MONGODB_URI Set the suggested connection MONGODB_URI (including credentials and topology info)
8+
9+
############################################
10+
# Main Program #
11+
############################################
12+
RELATIVE_DIR_PATH="$(dirname "${BASH_SOURCE:-$0}")"
13+
. "${RELATIVE_DIR_PATH}/setup-env.bash"
14+
echo "Running KMS Retry tests"
15+
16+
cp ${JAVA_HOME}/lib/security/cacerts mongo-truststore
17+
${JAVA_HOME}/bin/keytool -importcert -trustcacerts -file ${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem -keystore mongo-truststore -storepass changeit -storetype JKS -noprompt
18+
19+
export GRADLE_EXTRA_VARS="-Pssl.enabled=true -Pssl.trustStoreType=jks -Pssl.trustStore=`pwd`/mongo-truststore -Pssl.trustStorePassword=changeit"
20+
21+
./gradlew -version
22+
23+
# Disable errexit so both suites run and their exit codes can be captured below.
24+
set +o errexit
25+
26+
./gradlew --stacktrace --info ${GRADLE_EXTRA_VARS} -Dorg.mongodb.test.uri=${MONGODB_URI} \
27+
-Dorg.mongodb.test.kms.retry.ca.path="${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem" \
28+
driver-sync:cleanTest driver-sync:test --tests ClientSideEncryptionKmsRetryTest
29+
first=$?
30+
echo $first
31+
32+
./gradlew --stacktrace --info ${GRADLE_EXTRA_VARS} -Dorg.mongodb.test.uri=${MONGODB_URI} \
33+
-Dorg.mongodb.test.kms.retry.ca.path="${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem" \
34+
driver-reactive-streams:cleanTest driver-reactive-streams:test --tests ClientSideEncryptionKmsRetryTest
35+
second=$?
36+
echo $second
37+
38+
if [ $first -ne 0 ]; then
39+
exit $first
40+
elif [ $second -ne 0 ]; then
41+
exit $second
42+
else
43+
exit 0
44+
fi

driver-core/src/main/com/mongodb/internal/connection/tlschannel/impl/TlsChannelImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,13 @@ public long read(ByteBufferSet dest) throws IOException {
236236
case NOT_HANDSHAKING:
237237
case FINISHED:
238238
UnwrapResult res = readAndUnwrap(Optional.of(dest));
239+
bytesToReturn = res.bytesProduced;
239240
if (res.wasClosed) {
240-
return -1;
241+
// JAVA-5391: return any bytes produced alongside close_notify; the next read
242+
// sees shutdownReceived and returns -1. Fixed in upstream marianobarrios/tls-channel;
243+
// this is the minimal patch until the vendored snapshot is refreshed.
244+
return bytesToReturn > 0 ? bytesToReturn : -1;
241245
}
242-
bytesToReturn = res.bytesProduced;
243246
handshakeStatus = res.lastHandshakeStatus;
244247
break;
245248
case NEED_TASK:

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/KeyManagementService.java

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@
4848
import java.io.Closeable;
4949
import java.nio.channels.CompletionHandler;
5050
import java.nio.channels.InterruptedByTimeoutException;
51+
import java.time.Duration;
5152
import java.util.List;
5253
import java.util.Map;
5354

5455
import static java.util.Collections.singletonList;
56+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
5557
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5658
import static org.bson.assertions.Assertions.assertTrue;
5759

@@ -74,6 +76,19 @@ public void close() {
7476
}
7577

7678
Mono<Void> decryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Timeout operationTimeout) {
79+
return Mono.defer(() -> {
80+
Timeout.onExistsAndExpired(operationTimeout, () -> {
81+
throw TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE);
82+
});
83+
long sleepMicros = keyDecryptor.sleepMicroseconds();
84+
Mono<Void> attempt = sleepMicros > 0
85+
? Mono.delay(Duration.ofNanos(MICROSECONDS.toNanos(sleepMicros))).then(attemptDecryptKey(keyDecryptor, operationTimeout))
86+
: attemptDecryptKey(keyDecryptor, operationTimeout);
87+
return attempt.onErrorMap(this::unWrapException);
88+
});
89+
}
90+
91+
private Mono<Void> attemptDecryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Timeout operationTimeout) {
7792
SocketSettings socketSettings = SocketSettings.builder()
7893
.connectTimeout(timeoutMillis, MILLISECONDS)
7994
.readTimeout(timeoutMillis, MILLISECONDS)
@@ -85,7 +100,7 @@ Mono<Void> decryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Time
85100

86101
LOGGER.info("Connecting to KMS server at " + serverAddress);
87102

88-
return Mono.<Void>create(sink -> {
103+
return Mono.<Boolean>create(sink -> {
89104
Stream stream = streamFactory.create(serverAddress);
90105
OperationContext operationContext = createOperationContext(operationTimeout, socketSettings);
91106
stream.openAsync(operationContext, new AsyncCompletionHandler<Void>() {
@@ -97,70 +112,109 @@ public void completed(@Nullable final Void ignored) {
97112
@Override
98113
public void failed(final Throwable t) {
99114
stream.close();
100-
handleError(t, operationContext, sink);
115+
if (keyDecryptor.fail()) {
116+
sink.success(true);
117+
} else {
118+
handleError(t, operationContext, sink);
119+
}
101120
}
102121
});
103-
}).onErrorMap(this::unWrapException);
122+
}).flatMap(shouldRetry -> {
123+
if (shouldRetry) {
124+
return decryptKey(keyDecryptor, operationTimeout);
125+
}
126+
return Mono.empty();
127+
});
104128
}
105129

106130
private void streamWrite(final Stream stream, final MongoKeyDecryptor keyDecryptor,
107-
final OperationContext operationContext, final MonoSink<Void> sink) {
131+
final OperationContext operationContext, final MonoSink<Boolean> sink) {
108132
List<ByteBuf> byteBufs = singletonList(new ByteBufNIO(keyDecryptor.getMessage()));
109133
stream.writeAsync(byteBufs, operationContext, new AsyncCompletionHandler<Void>() {
110134
@Override
111135
public void completed(@Nullable final Void aVoid) {
112-
streamRead(stream, keyDecryptor, operationContext, sink);
136+
streamRead(stream, keyDecryptor, operationContext, sink, true);
113137
}
114138

115139
@Override
116140
public void failed(final Throwable t) {
117141
stream.close();
118-
handleError(t, operationContext, sink);
142+
if (keyDecryptor.fail()) {
143+
sink.success(true);
144+
} else {
145+
handleError(t, operationContext, sink);
146+
}
119147
}
120148
});
121149
}
122150

123151
private void streamRead(final Stream stream, final MongoKeyDecryptor keyDecryptor,
124-
final OperationContext operationContext, final MonoSink<Void> sink) {
152+
final OperationContext operationContext, final MonoSink<Boolean> sink,
153+
final boolean firstRead) {
125154
int bytesNeeded = keyDecryptor.bytesNeeded();
126-
if (bytesNeeded > 0) {
155+
// After a fail()-triggered retry, libmongocrypt sets should_retry=true which causes
156+
// bytesNeeded() to return 0 until feedAndRetry() clears the flag. The first read of a
157+
// freshly-written request must happen regardless of bytesNeeded(); only recursive reads
158+
// can trust bytesNeeded()==0 as "done".
159+
if (bytesNeeded > 0 || firstRead) {
160+
int readSize = bytesNeeded > 0 ? bytesNeeded : MongoKeyDecryptor.DEFAULT_KMS_READ_SIZE;
127161
AsynchronousChannelStream asyncStream = (AsynchronousChannelStream) stream;
128-
ByteBuf buffer = asyncStream.getBuffer(bytesNeeded);
162+
ByteBuf buffer = asyncStream.getBuffer(readSize);
129163
long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS();
130164
asyncStream.getChannel().read(buffer.asNIO(), readTimeoutMS, MILLISECONDS, null,
131165
new CompletionHandler<Integer, Void>() {
132166

133167
@Override
134168
public void completed(final Integer integer, final Void aVoid) {
135169
if (integer == -1) {
136-
sink.error(new MongoException(
137-
"Unexpected end of stream from KMS provider " + keyDecryptor.getKmsProvider()));
170+
buffer.release();
171+
stream.close();
172+
if (keyDecryptor.fail()) {
173+
sink.success(true);
174+
} else {
175+
sink.error(new MongoException(
176+
"Unexpected end of stream from KMS provider "
177+
+ keyDecryptor.getKmsProvider()));
178+
}
138179
return;
139180
}
140181
buffer.flip();
182+
boolean shouldRetry;
141183
try {
142-
keyDecryptor.feed(buffer.asNIO());
143-
buffer.release();
144-
streamRead(stream, keyDecryptor, operationContext, sink);
184+
shouldRetry = keyDecryptor.feedAndRetry(buffer.asNIO());
145185
} catch (Throwable t) {
186+
stream.close();
146187
sink.error(t);
188+
return;
189+
} finally {
190+
buffer.release();
191+
}
192+
if (shouldRetry) {
193+
stream.close();
194+
sink.success(true);
195+
} else {
196+
streamRead(stream, keyDecryptor, operationContext, sink, false);
147197
}
148198
}
149199

150200
@Override
151201
public void failed(final Throwable t, final Void aVoid) {
152202
buffer.release();
153203
stream.close();
154-
handleError(t, operationContext, sink);
204+
if (keyDecryptor.fail()) {
205+
sink.success(true);
206+
} else {
207+
handleError(t, operationContext, sink);
208+
}
155209
}
156210
});
157211
} else {
158212
stream.close();
159-
sink.success();
213+
sink.success(false);
160214
}
161215
}
162216

163-
private static void handleError(final Throwable t, final OperationContext operationContext, final MonoSink<Void> sink) {
217+
private static void handleError(final Throwable t, final OperationContext operationContext, final MonoSink<Boolean> sink) {
164218
if (isTimeoutException(t) && operationContext.getTimeoutContext().hasTimeoutMS()) {
165219
sink.error(TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE, t));
166220
} else {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import com.mongodb.ClientEncryptionSettings;
20+
import com.mongodb.client.AbstractClientSideEncryptionKmsRetryTest;
21+
import com.mongodb.client.vault.ClientEncryption;
22+
import com.mongodb.reactivestreams.client.syncadapter.SyncClientEncryption;
23+
import com.mongodb.reactivestreams.client.vault.ClientEncryptions;
24+
25+
public class ClientSideEncryptionKmsRetryTest extends AbstractClientSideEncryptionKmsRetryTest {
26+
@Override
27+
public ClientEncryption getClientEncryption(final ClientEncryptionSettings settings) {
28+
return new SyncClientEncryption(ClientEncryptions.create(settings));
29+
}
30+
}

0 commit comments

Comments
 (0)