Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .evergreen/.evg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,16 @@ functions:
set +o xtrace
MONGODB_URI="${MONGODB_URI}" KMS_TLS_ERROR_TYPE=${KMS_TLS_ERROR_TYPE} .evergreen/run-kms-tls-tests.sh

"run-kms-retry-test":
- command: shell.exec
type: "test"
params:
working_dir: "src"
script: |
${PREPARE_SHELL}
set +o xtrace
MONGODB_URI="${MONGODB_URI}" .evergreen/run-kms-retry-tests.sh

"run-csfle-aws-from-environment-test":
- command: shell.exec
type: "test"
Expand Down Expand Up @@ -1632,6 +1642,17 @@ tasks:
AUTH: "noauth"
SSL: "nossl"

- name: "test-kms-retry-task"
tags: [ "kms-retry" ]
commands:
- func: "start-mongo-orchestration"
vars:
TOPOLOGY: "server"
AUTH: "noauth"
SSL: "nossl"
- func: "start-csfle-servers"
- func: "run-kms-retry-test"

- name: "test-csfle-aws-from-environment-task"
tags: [ "csfle-aws-from-environment" ]
commands:
Expand Down Expand Up @@ -2528,6 +2549,12 @@ buildvariants:
tasks:
- name: ".kms-tls"

- matrix_name: "kms-retry-test"
matrix_spec: { os: "linux", version: [ "5.0" ], topology: [ "standalone" ] }
display_name: "CSFLE KMS Retry"
tasks:
- name: ".kms-retry"

- matrix_name: "csfle-aws-from-environment-test"
matrix_spec: { os: "linux", version: [ "5.0" ], topology: [ "standalone" ] }
display_name: "CSFLE AWS From Environment"
Expand Down
44 changes: 44 additions & 0 deletions .evergreen/run-kms-retry-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash

# Don't trace since the URI contains a password that shouldn't show up in the logs
set -o errexit # Exit the script with error if any of the commands fail

# Supported/used environment variables:
# MONGODB_URI Set the suggested connection MONGODB_URI (including credentials and topology info)

############################################
# Main Program #
############################################
RELATIVE_DIR_PATH="$(dirname "${BASH_SOURCE:-$0}")"
. "${RELATIVE_DIR_PATH}/setup-env.bash"
echo "Running KMS Retry tests"

cp ${JAVA_HOME}/lib/security/cacerts mongo-truststore
${JAVA_HOME}/bin/keytool -importcert -trustcacerts -file ${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem -keystore mongo-truststore -storepass changeit -storetype JKS -noprompt

export GRADLE_EXTRA_VARS="-Pssl.enabled=true -Pssl.trustStoreType=jks -Pssl.trustStore=`pwd`/mongo-truststore -Pssl.trustStorePassword=changeit"

./gradlew -version

# Disable errexit so both suites run and their exit codes can be captured below.
set +o errexit

./gradlew --stacktrace --info ${GRADLE_EXTRA_VARS} -Dorg.mongodb.test.uri=${MONGODB_URI} \
-Dorg.mongodb.test.kms.retry.ca.path="${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem" \
driver-sync:cleanTest driver-sync:test --tests ClientSideEncryptionKmsRetryProseTest
first=$?
echo $first

./gradlew --stacktrace --info ${GRADLE_EXTRA_VARS} -Dorg.mongodb.test.uri=${MONGODB_URI} \
-Dorg.mongodb.test.kms.retry.ca.path="${DRIVERS_TOOLS}/.evergreen/x509gen/ca.pem" \
driver-reactive-streams:cleanTest driver-reactive-streams:test --tests ClientSideEncryptionKmsRetryProseTest
Comment thread
rozza marked this conversation as resolved.
second=$?
echo $second

if [ $first -ne 0 ]; then
exit $first
elif [ $second -ne 0 ]; then
exit $second
else
exit 0
fi
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,13 @@ public long read(ByteBufferSet dest) throws IOException {
case NOT_HANDSHAKING:
case FINISHED:
UnwrapResult res = readAndUnwrap(Optional.of(dest));
bytesToReturn = res.bytesProduced;
if (res.wasClosed) {
return -1;
// JAVA-5391: return any bytes produced alongside close_notify; the next read
// sees shutdownReceived and returns -1. Fixed in upstream marianobarrios/tls-channel;
// this is the minimal patch until the vendored snapshot is refreshed.
return bytesToReturn > 0 ? bytesToReturn : -1;
}
bytesToReturn = res.bytesProduced;
handshakeStatus = res.lastHandshakeStatus;
break;
case NEED_TASK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
import java.io.Closeable;
import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.time.Duration;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.bson.assertions.Assertions.assertTrue;

Expand All @@ -74,6 +76,29 @@ public void close() {
}

Mono<Void> decryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Timeout operationTimeout) {
return Mono.defer(() -> {
long sleepMicros = keyDecryptor.sleepMicroseconds();
if (sleepMicros > 0 && operationTimeout != null) {
operationTimeout.run(MICROSECONDS,
() -> { },
remainingMicros -> {
if (remainingMicros < sleepMicros) {
throw TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE);
}
},
() -> {
throw TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE);
});
}
Mono<Void> attempt = sleepMicros > 0
? Mono.delay(Duration.ofNanos(MICROSECONDS.toNanos(sleepMicros)))
.then(attemptDecryptKey(keyDecryptor, operationTimeout))
: attemptDecryptKey(keyDecryptor, operationTimeout);
return attempt;
}).onErrorMap(this::unWrapException);
}

private Mono<Void> attemptDecryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Timeout operationTimeout) {
SocketSettings socketSettings = SocketSettings.builder()
.connectTimeout(timeoutMillis, MILLISECONDS)
.readTimeout(timeoutMillis, MILLISECONDS)
Expand All @@ -85,84 +110,119 @@ Mono<Void> decryptKey(final MongoKeyDecryptor keyDecryptor, @Nullable final Time

LOGGER.info("Connecting to KMS server at " + serverAddress);

return Mono.<Void>create(sink -> {
Stream stream = streamFactory.create(serverAddress);
return Mono.<Boolean>create(sink -> {
OperationContext operationContext = createOperationContext(operationTimeout, socketSettings);
Stream stream = streamFactory.create(serverAddress);
stream.openAsync(operationContext, new AsyncCompletionHandler<Void>() {
@Override
public void completed(@Nullable final Void ignored) {
streamWrite(stream, keyDecryptor, operationContext, sink);
try {
streamWrite(stream, keyDecryptor, operationContext, sink);
} catch (Throwable t) {
stream.close();
sink.error(t);
}
}

@Override
public void failed(final Throwable t) {
stream.close();
handleError(t, operationContext, sink);
failOrHandleError(t, keyDecryptor, operationContext, sink);
}
});
}).onErrorMap(this::unWrapException);
}).flatMap(shouldRetry -> {
if (shouldRetry) {
return decryptKey(keyDecryptor, operationTimeout);
}
return Mono.empty();
});
}

private void streamWrite(final Stream stream, final MongoKeyDecryptor keyDecryptor,
final OperationContext operationContext, final MonoSink<Void> sink) {
final OperationContext operationContext, final MonoSink<Boolean> sink) {
List<ByteBuf> byteBufs = singletonList(new ByteBufNIO(keyDecryptor.getMessage()));
stream.writeAsync(byteBufs, operationContext, new AsyncCompletionHandler<Void>() {
@Override
public void completed(@Nullable final Void aVoid) {
streamRead(stream, keyDecryptor, operationContext, sink);
try {
int bytesNeeded = keyDecryptor.bytesNeeded();
int readSize = bytesNeeded > 0 ? bytesNeeded : MongoKeyDecryptor.DEFAULT_KMS_READ_SIZE;
streamRead(stream, keyDecryptor, operationContext, sink, readSize);
} catch (Throwable t) {
stream.close();
sink.error(t);
}
}

@Override
public void failed(final Throwable t) {
stream.close();
handleError(t, operationContext, sink);
failOrHandleError(t, keyDecryptor, operationContext, sink);
}
});
}

private void streamRead(final Stream stream, final MongoKeyDecryptor keyDecryptor,
final OperationContext operationContext, final MonoSink<Void> sink) {
int bytesNeeded = keyDecryptor.bytesNeeded();
if (bytesNeeded > 0) {
AsynchronousChannelStream asyncStream = (AsynchronousChannelStream) stream;
ByteBuf buffer = asyncStream.getBuffer(bytesNeeded);
long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS();
asyncStream.getChannel().read(buffer.asNIO(), readTimeoutMS, MILLISECONDS, null,
new CompletionHandler<Integer, Void>() {

@Override
public void completed(final Integer integer, final Void aVoid) {
if (integer == -1) {
sink.error(new MongoException(
"Unexpected end of stream from KMS provider " + keyDecryptor.getKmsProvider()));
return;
}
buffer.flip();
try {
keyDecryptor.feed(buffer.asNIO());
buffer.release();
streamRead(stream, keyDecryptor, operationContext, sink);
} catch (Throwable t) {
sink.error(t);
}
}

@Override
public void failed(final Throwable t, final Void aVoid) {
buffer.release();
stream.close();
handleError(t, operationContext, sink);
}
});
} else {
final OperationContext operationContext, final MonoSink<Boolean> sink,
final int readSize) {
if (readSize <= 0) {
stream.close();
sink.success();
sink.success(false);
return;
}
AsynchronousChannelStream asyncStream = (AsynchronousChannelStream) stream;
ByteBuf buffer = asyncStream.getBuffer(readSize);
long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS();
asyncStream.getChannel().read(buffer.asNIO(), readTimeoutMS, MILLISECONDS, null,
new CompletionHandler<Integer, Void>() {

@Override
public void completed(final Integer integer, final Void aVoid) {
try {
if (integer == -1) {
buffer.release();
stream.close();
MongoException eof = new MongoException("Unexpected end of stream from KMS provider "
+ keyDecryptor.getKmsProvider());
failOrHandleError(eof, keyDecryptor, operationContext, sink);
return;
}
buffer.flip();
boolean shouldRetry;
try {
shouldRetry = keyDecryptor.feedAndRetry(buffer.asNIO());
} finally {
buffer.release();
}
if (shouldRetry) {
stream.close();
sink.success(true);
} else {
streamRead(stream, keyDecryptor, operationContext, sink,
keyDecryptor.bytesNeeded());
}
} catch (Throwable t) {
stream.close();
sink.error(t);
}
}

@Override
public void failed(final Throwable t, final Void aVoid) {
buffer.release();
stream.close();
failOrHandleError(t, keyDecryptor, operationContext, sink);
}
});
}

private static void handleError(final Throwable t, final OperationContext operationContext, final MonoSink<Void> sink) {
private static void failOrHandleError(final Throwable t, final MongoKeyDecryptor keyDecryptor,
final OperationContext operationContext, final MonoSink<Boolean> sink) {
if (isTimeoutException(t) && operationContext.getTimeoutContext().hasTimeoutMS()) {
sink.error(TimeoutContext.createMongoTimeoutException(TIMEOUT_ERROR_MESSAGE, t));
} else if (keyDecryptor.fail()) {
LOGGER.debug("Retrying KMS request after transient error", t);
sink.success(true);
} else {
sink.error(t);
Comment thread
rozza marked this conversation as resolved.
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.reactivestreams.client;

import com.mongodb.ClientEncryptionSettings;
import com.mongodb.client.AbstractClientSideEncryptionKmsRetryProseTest;
import com.mongodb.client.vault.ClientEncryption;
import com.mongodb.reactivestreams.client.syncadapter.SyncClientEncryption;
import com.mongodb.reactivestreams.client.vault.ClientEncryptions;

public class ClientSideEncryptionKmsRetryProseTest extends AbstractClientSideEncryptionKmsRetryProseTest {
@Override
public ClientEncryption getClientEncryption(final ClientEncryptionSettings settings) {
return new SyncClientEncryption(ClientEncryptions.create(settings));
}
}
Loading