Skip to content

Commit 9bd2276

Browse files
committed
Retry KMS requests on transient errors
Add libmongocrypt CAPI bindings for KMS retry support and wire retry logic through the sync and reactive driver stacks. Transient KMS HTTP and network errors are retried with backoff delays managed by libmongocrypt; retry is enabled unconditionally. - Add native bindings: mongocrypt_setopt_retry_kms, mongocrypt_kms_ctx_usleep, mongocrypt_kms_ctx_feed_with_retry, mongocrypt_kms_ctx_fail - Add sleepMicroseconds(), feedAndRetry(), fail() to MongoKeyDecryptor - Enable KMS retry unconditionally in MongoCryptImpl - Rewrite sync Crypt.decryptKey() with retry loop, timeout-aware - Add retry logic to reactive KeyManagementService.decryptKey() - Fix TlsChannelImpl.read() to preserve bytes delivered alongside close_notify (already fixed upstream in marianobarrios/tls-channel) - Add spec Section 24 KMS retry integration tests (sync + reactive) - Add Evergreen CI task for KMS retry tests JAVA-5391
1 parent a6ea5ff commit 9bd2276

12 files changed

Lines changed: 718 additions & 48 deletions

File tree

.evergreen/.evg.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,16 @@ functions:
764764
set +o xtrace
765765
MONGODB_URI="${MONGODB_URI}" KMS_TLS_ERROR_TYPE=${KMS_TLS_ERROR_TYPE} .evergreen/run-kms-tls-tests.sh
766766
767+
"run-kms-retry-test":
768+
- command: shell.exec
769+
type: "test"
770+
params:
771+
working_dir: "src"
772+
script: |
773+
${PREPARE_SHELL}
774+
set +o xtrace
775+
MONGODB_URI="${MONGODB_URI}" .evergreen/run-kms-retry-tests.sh
776+
767777
"run-csfle-aws-from-environment-test":
768778
- command: shell.exec
769779
type: "test"
@@ -1632,6 +1642,17 @@ tasks:
16321642
AUTH: "noauth"
16331643
SSL: "nossl"
16341644

1645+
- name: "test-kms-retry-task"
1646+
tags: [ "kms-retry" ]
1647+
commands:
1648+
- func: "start-mongo-orchestration"
1649+
vars:
1650+
TOPOLOGY: "server"
1651+
AUTH: "noauth"
1652+
SSL: "nossl"
1653+
- func: "start-csfle-servers"
1654+
- func: "run-kms-retry-test"
1655+
16351656
- name: "test-csfle-aws-from-environment-task"
16361657
tags: [ "csfle-aws-from-environment" ]
16371658
commands:
@@ -2528,6 +2549,12 @@ buildvariants:
25282549
tasks:
25292550
- name: ".kms-tls"
25302551

2552+
- matrix_name: "kms-retry-test"
2553+
matrix_spec: { os: "linux", version: [ "5.0" ], topology: [ "standalone" ] }
2554+
display_name: "CSFLE KMS Retry"
2555+
tasks:
2556+
- name: ".kms-retry"
2557+
25312558
- matrix_name: "csfle-aws-from-environment-test"
25322559
matrix_spec: { os: "linux", version: [ "5.0" ], topology: [ "standalone" ] }
25332560
display_name: "CSFLE AWS From Environment"

.evergreen/run-kms-retry-tests.sh

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#!/bin/bash
2+
3+
# Don't trace since the URI contains a password that shouldn't show up in the logs
4+
set -o errexit # Exit the script with error if any of the commands fail
5+
6+
# Supported/used environment variables:
7+
# MONGODB_URI Set the suggested connection MONGODB_URI (including credentials and topology info)
8+
9+
############################################
10+
# Main Program #
11+
############################################
12+
RELATIVE_DIR_PATH="$(dirname "${BASH_SOURCE:-$0}")"
13+
. "${RELATIVE_DIR_PATH}/setup-env.bash"
14+
echo "Running KMS Retry tests"
15+
16+
cp ${JAVA_HOME}/lib/security/cacerts mongo-truststore
17+
${JAVA_HOME}/bin/keytool -importcert -trustcacerts -file ${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem -keystore mongo-truststore -storepass changeit -storetype JKS -noprompt
18+
19+
export GRADLE_EXTRA_VARS="-Pssl.enabled=true -Pssl.trustStoreType=jks -Pssl.trustStore=`pwd`/mongo-truststore -Pssl.trustStorePassword=changeit"
20+
21+
./gradlew -version
22+
23+
# Disable errexit so both suites run and their exit codes can be captured below.
24+
set +o errexit
25+
26+
./gradlew --stacktrace --info ${GRADLE_EXTRA_VARS} -Dorg.mongodb.test.uri=${MONGODB_URI} \
27+
-Dorg.mongodb.test.kms.retry.ca.path="${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem" \
28+
driver-sync:cleanTest driver-sync:test --tests ClientSideEncryptionKmsRetryProseTest
29+
first=$?
30+
echo $first
31+
32+
./gradlew --stacktrace --info ${GRADLE_EXTRA_VARS} -Dorg.mongodb.test.uri=${MONGODB_URI} \
33+
-Dorg.mongodb.test.kms.retry.ca.path="${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem" \
34+
driver-reactive-streams:cleanTest driver-reactive-streams:test --tests ClientSideEncryptionKmsRetryProseTest
35+
second=$?
36+
echo $second
37+
38+
if [ $first -ne 0 ]; then
39+
exit $first
40+
elif [ $second -ne 0 ]; then
41+
exit $second
42+
else
43+
exit 0
44+
fi

driver-core/src/main/com/mongodb/internal/connection/tlschannel/impl/TlsChannelImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,13 @@ public long read(ByteBufferSet dest) throws IOException {
236236
case NOT_HANDSHAKING:
237237
case FINISHED:
238238
UnwrapResult res = readAndUnwrap(Optional.of(dest));
239+
bytesToReturn = res.bytesProduced;
239240
if (res.wasClosed) {
240-
return -1;
241+
// JAVA-5391: return any bytes produced alongside close_notify; the next read
242+
// sees shutdownReceived and returns -1. Fixed in upstream marianobarrios/tls-channel;
243+
// this is the minimal patch until the vendored snapshot is refreshed.
244+
return bytesToReturn > 0 ? bytesToReturn : -1;
241245
}
242-
bytesToReturn = res.bytesProduced;
243246
handshakeStatus = res.lastHandshakeStatus;
244247
break;
245248
case NEED_TASK:

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/KeyManagementService.java

Lines changed: 98 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@
4848
import java.io.Closeable;
4949
import java.nio.channels.CompletionHandler;
5050
import java.nio.channels.InterruptedByTimeoutException;
51+
import java.time.Duration;
5152
import java.util.List;
5253
import java.util.Map;
5354

5455
import static java.util.Collections.singletonList;
56+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
5557
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5658
import static org.bson.assertions.Assertions.assertTrue;
5759

@@ -74,6 +76,29 @@ public void close() {
7476
}
7577

7678
Mono<Void> decryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Timeout operationTimeout) {
79+
return Mono.defer(() -> {
80+
long sleepMicros = keyDecryptor.sleepMicroseconds();
81+
if (sleepMicros > 0 && operationTimeout != null) {
82+
operationTimeout.run(MICROSECONDS,
83+
() -> { },
84+
remainingMicros -> {
85+
if (remainingMicros < sleepMicros) {
86+
throw TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE);
87+
}
88+
},
89+
() -> {
90+
throw TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE);
91+
});
92+
}
93+
Mono<Void> attempt = sleepMicros > 0
94+
? Mono.delay(Duration.ofNanos(MICROSECONDS.toNanos(sleepMicros)))
95+
.then(attemptDecryptKey(keyDecryptor, operationTimeout))
96+
: attemptDecryptKey(keyDecryptor, operationTimeout);
97+
return attempt.onErrorMap(this::unWrapException);
98+
});
99+
}
100+
101+
private Mono<Void> attemptDecryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Timeout operationTimeout) {
77102
SocketSettings socketSettings = SocketSettings.builder()
78103
.connectTimeout(timeoutMillis, MILLISECONDS)
79104
.readTimeout(timeoutMillis, MILLISECONDS)
@@ -85,84 +110,120 @@ Mono<Void> decryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Time
85110

86111
LOGGER.info("Connecting to KMS server at " + serverAddress);
87112

88-
return Mono.<Void>create(sink -> {
89-
Stream stream = streamFactory.create(serverAddress);
113+
return Mono.<Boolean>create(sink -> {
90114
OperationContext operationContext = createOperationContext(operationTimeout, socketSettings);
115+
Stream stream = streamFactory.create(serverAddress);
91116
stream.openAsync(operationContext, new AsyncCompletionHandler<Void>() {
92117
@Override
93118
public void completed(@Nullable final Void ignored) {
94-
streamWrite(stream, keyDecryptor, operationContext, sink);
119+
try {
120+
streamWrite(stream, keyDecryptor, operationContext, sink);
121+
} catch (Throwable t) {
122+
stream.close();
123+
sink.error(t);
124+
}
95125
}
96126

97127
@Override
98128
public void failed(final Throwable t) {
99129
stream.close();
100-
handleError(t, operationContext, sink);
130+
failOrHandleError(t, keyDecryptor, operationContext, sink);
101131
}
102132
});
103-
}).onErrorMap(this::unWrapException);
133+
}).flatMap(shouldRetry -> {
134+
if (shouldRetry) {
135+
return decryptKey(keyDecryptor, operationTimeout);
136+
}
137+
return Mono.empty();
138+
});
104139
}
105140

106141
private void streamWrite(final Stream stream, final MongoKeyDecryptor keyDecryptor,
107-
final OperationContext operationContext, final MonoSink<Void> sink) {
142+
final OperationContext operationContext, final MonoSink<Boolean> sink) {
108143
List<ByteBuf> byteBufs = singletonList(new ByteBufNIO(keyDecryptor.getMessage()));
109144
stream.writeAsync(byteBufs, operationContext, new AsyncCompletionHandler<Void>() {
110145
@Override
111146
public void completed(@Nullable final Void aVoid) {
112-
streamRead(stream, keyDecryptor, operationContext, sink);
147+
try {
148+
int bytesNeeded = keyDecryptor.bytesNeeded();
149+
int readSize = bytesNeeded > 0 ? bytesNeeded : MongoKeyDecryptor.DEFAULT_KMS_READ_SIZE;
150+
streamRead(stream, keyDecryptor, operationContext, sink, readSize);
151+
} catch (Throwable t) {
152+
stream.close();
153+
sink.error(t);
154+
}
113155
}
114156

115157
@Override
116158
public void failed(final Throwable t) {
117159
stream.close();
118-
handleError(t, operationContext, sink);
160+
failOrHandleError(t, keyDecryptor, operationContext, sink);
119161
}
120162
});
121163
}
122164

123165
private void streamRead(final Stream stream, final MongoKeyDecryptor keyDecryptor,
124-
final OperationContext operationContext, final MonoSink<Void> sink) {
125-
int bytesNeeded = keyDecryptor.bytesNeeded();
126-
if (bytesNeeded > 0) {
127-
AsynchronousChannelStream asyncStream = (AsynchronousChannelStream) stream;
128-
ByteBuf buffer = asyncStream.getBuffer(bytesNeeded);
129-
long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS();
130-
asyncStream.getChannel().read(buffer.asNIO(), readTimeoutMS, MILLISECONDS, null,
131-
new CompletionHandler<Integer, Void>() {
132-
133-
@Override
134-
public void completed(final Integer integer, final Void aVoid) {
166+
final OperationContext operationContext, final MonoSink<Boolean> sink,
167+
final int readSize) {
168+
if (readSize <= 0) {
169+
stream.close();
170+
sink.success(false);
171+
return;
172+
}
173+
AsynchronousChannelStream asyncStream = (AsynchronousChannelStream) stream;
174+
ByteBuf buffer = asyncStream.getBuffer(readSize);
175+
long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS();
176+
asyncStream.getChannel().read(buffer.asNIO(), readTimeoutMS, MILLISECONDS, null,
177+
new CompletionHandler<Integer, Void>() {
178+
179+
@Override
180+
public void completed(final Integer integer, final Void aVoid) {
181+
try {
135182
if (integer == -1) {
136-
sink.error(new MongoException(
137-
"Unexpected end of stream from KMS provider " + keyDecryptor.getKmsProvider()));
183+
buffer.release();
184+
stream.close();
185+
MongoException eof = new MongoException(
186+
"Unexpected end of stream from KMS provider "
187+
+ keyDecryptor.getKmsProvider());
188+
failOrHandleError(eof, keyDecryptor, operationContext, sink);
138189
return;
139190
}
140191
buffer.flip();
192+
boolean shouldRetry;
141193
try {
142-
keyDecryptor.feed(buffer.asNIO());
194+
shouldRetry = keyDecryptor.feedAndRetry(buffer.asNIO());
195+
} finally {
143196
buffer.release();
144-
streamRead(stream, keyDecryptor, operationContext, sink);
145-
} catch (Throwable t) {
146-
sink.error(t);
147197
}
148-
}
149-
150-
@Override
151-
public void failed(final Throwable t, final Void aVoid) {
152-
buffer.release();
198+
if (shouldRetry) {
199+
stream.close();
200+
sink.success(true);
201+
} else {
202+
streamRead(stream, keyDecryptor, operationContext, sink,
203+
keyDecryptor.bytesNeeded());
204+
}
205+
} catch (Throwable t) {
153206
stream.close();
154-
handleError(t, operationContext, sink);
207+
sink.error(t);
155208
}
156-
});
157-
} else {
158-
stream.close();
159-
sink.success();
160-
}
209+
}
210+
211+
@Override
212+
public void failed(final Throwable t, final Void aVoid) {
213+
buffer.release();
214+
stream.close();
215+
failOrHandleError(t, keyDecryptor, operationContext, sink);
216+
}
217+
});
161218
}
162219

163-
private static void handleError(final Throwable t, final OperationContext operationContext, final MonoSink<Void> sink) {
220+
private static void failOrHandleError(final Throwable t, final MongoKeyDecryptor keyDecryptor,
221+
final OperationContext operationContext, final MonoSink<Boolean> sink) {
164222
if (isTimeoutException(t) && operationContext.getTimeoutContext().hasTimeoutMS()) {
165223
sink.error(TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE, t));
224+
} else if (keyDecryptor.fail()) {
225+
LOGGER.debug("Retrying KMS request after transient error", t);
226+
sink.success(true);
166227
} else {
167228
sink.error(t);
168229
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import com.mongodb.ClientEncryptionSettings;
20+
import com.mongodb.client.AbstractClientSideEncryptionKmsRetryProseTest;
21+
import com.mongodb.client.vault.ClientEncryption;
22+
import com.mongodb.reactivestreams.client.syncadapter.SyncClientEncryption;
23+
import com.mongodb.reactivestreams.client.vault.ClientEncryptions;
24+
25+
public class ClientSideEncryptionKmsRetryProseTest extends AbstractClientSideEncryptionKmsRetryProseTest {
26+
@Override
27+
public ClientEncryption getClientEncryption(final ClientEncryptionSettings settings) {
28+
return new SyncClientEncryption(ClientEncryptions.create(settings));
29+
}
30+
}

0 commit comments

Comments
 (0)