diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle index c716d5b3d08..a7c58d549c3 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle @@ -14,6 +14,8 @@ muzzle { // KotlinAwareHandlerInstrumentation references Publisher from reactive-streams, // which is not bundled in spring-messaging but is always present when Spring Kafka is. extraDependency 'org.reactivestreams:reactive-streams:1.0.4' + // Spring Cloud AWS error-handler instrumentations reference ListenerExecutionFailedException. + extraDependency 'io.awspring.cloud:spring-cloud-aws-sqs:3.0.1' } } @@ -45,6 +47,7 @@ kotlin { dependencies { compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE' compileOnly 'org.reactivestreams:reactive-streams:1.0.4' + compileOnly group: 'io.awspring.cloud', name: 'spring-cloud-aws-sqs', version: '3.0.1' testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common') // capture SQS send and receive spans, propagate trace details in messages @@ -69,6 +72,7 @@ dependencies { // KotlinAwareHandlerInstrumentation relies on the reactive-streams and reactor instrumentation testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0') testImplementation project(':dd-java-agent:instrumentation:reactor-core-3.1') + testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test' testImplementation 'org.apache.kafka:kafka-clients:3.8.0' diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsErrorHandlerHelper.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsErrorHandlerHelper.java new file mode 100644 index 00000000000..d877a22138a --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsErrorHandlerHelper.java @@ -0,0 +1,43 @@ +package datadog.trace.instrumentation.springmessaging; + +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException; +import java.util.function.BiConsumer; + +public final class SpringCloudAwsErrorHandlerHelper { + private SpringCloudAwsErrorHandlerHelper() {} + + public static ListenerExecutionFailedException findListenerExecutionFailedException( + Throwable error) { + Throwable current = error; + while (current != null && !(current instanceof ListenerExecutionFailedException)) { + Throwable cause = current.getCause(); + if (cause == current) { + return null; + } + current = cause; + } + return (ListenerExecutionFailedException) current; + } + + public static final class CleanupOnError implements BiConsumer { + private final ContextStore contextStore; + + public CleanupOnError(ContextStore contextStore) { + this.contextStore = contextStore; + } + + @Override + public void accept(Object ignored, Throwable error) { + if (error == null) { + return; + } + ListenerExecutionFailedException listenerException = + findListenerExecutionFailedException(error); + if (listenerException != null) { + SpringMessageErrorHandlerHelper.cancelContinuation(contextStore.get(listenerException)); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsErrorHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsErrorHandlerInstrumentation.java new file mode 100644 index 00000000000..05e11c8c3aa --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsErrorHandlerInstrumentation.java @@ -0,0 +1,106 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public final class SpringCloudAwsErrorHandlerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public SpringCloudAwsErrorHandlerInstrumentation() { + super("spring-messaging", "spring-messaging-4"); + } + + @Override + public String instrumentedType() { + return "io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".SpringMessageErrorHandlerHelper", + packageName + ".SpringCloudAwsErrorHandlerHelper" + }; + } + + @Override + public Map contextStore() { + return singletonMap(ListenerExecutionFailedException.class.getName(), State.class.getName()); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("handleError")) + .and(takesArguments(2)) + .and(takesArgument(1, Throwable.class)), + SpringCloudAwsErrorHandlerInstrumentation.class.getName() + + "$ActivateErrorHandlerContinuation"); + transformer.applyAdvice( + isMethod() + .and(named("handleErrors")) + .and(takesArguments(2)) + .and(takesArgument(1, Throwable.class)), + SpringCloudAwsErrorHandlerInstrumentation.class.getName() + + "$ActivateErrorHandlerContinuation"); + transformer.applyAdvice( + isMethod().and(named("process")).and(takesArguments(2)), + SpringCloudAwsErrorHandlerInstrumentation.class.getName() + "$CleanupContinuation"); + transformer.applyAdvice( + isMethod().and(named("processMany")).and(takesArguments(2)), + SpringCloudAwsErrorHandlerInstrumentation.class.getName() + "$CleanupContinuation"); + } + + public static class ActivateErrorHandlerContinuation { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter(@Advice.Argument(1) Throwable error) { + ListenerExecutionFailedException listenerException = + SpringCloudAwsErrorHandlerHelper.findListenerExecutionFailedException(error); + if (listenerException == null) { + return null; + } + ContextStore contextStore = + InstrumentationContext.get(ListenerExecutionFailedException.class, State.class); + State state = contextStore.get(listenerException); + return SpringMessageErrorHandlerHelper.activateContinuation(state); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit(@Advice.Enter AgentScope scope) { + if (scope != null) { + scope.close(); + } + } + } + + public static class CleanupContinuation { + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) java.util.concurrent.CompletableFuture result) { + if (result != null) { + ContextStore contextStore = + InstrumentationContext.get(ListenerExecutionFailedException.class, State.class); + result = + result.whenComplete(new SpringCloudAwsErrorHandlerHelper.CleanupOnError(contextStore)); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsListenerAdapterInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsListenerAdapterInstrumentation.java new file mode 100644 index 00000000000..a0bd7c58761 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringCloudAwsListenerAdapterInstrumentation.java @@ -0,0 +1,80 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public final class SpringCloudAwsListenerAdapterInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public SpringCloudAwsListenerAdapterInstrumentation() { + super("spring-messaging", "spring-messaging-4"); + } + + @Override + public String instrumentedType() { + return "io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter"; + } + + @Override + public String[] helperClassNames() { + return new String[] {packageName + ".SpringMessageErrorHandlerHelper"}; + } + + @Override + public Map contextStore() { + return singletonMap(ListenerExecutionFailedException.class.getName(), State.class.getName()); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("invokeHandler")).and(takesArguments(1)), + SpringCloudAwsListenerAdapterInstrumentation.class.getName() + "$InvokeHandlerAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("createListenerException")) + .and(takesArguments(2)) + .and(takesArgument(1, Throwable.class)), + SpringCloudAwsListenerAdapterInstrumentation.class.getName() + + "$CreateListenerExceptionAdvice"); + } + + public static class InvokeHandlerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter() { + SpringMessageErrorHandlerHelper.enterAwsListenerInvocation(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit() { + SpringMessageErrorHandlerHelper.clearPendingContinuation(); + SpringMessageErrorHandlerHelper.exitAwsListenerInvocation(); + } + } + + public static class CreateListenerExceptionAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return ListenerExecutionFailedException listenerException) { + ContextStore contextStore = + InstrumentationContext.get(ListenerExecutionFailedException.class, State.class); + State state = contextStore.putIfAbsent(listenerException, State.FACTORY); + SpringMessageErrorHandlerHelper.transferPendingContinuation(state); + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageErrorHandlerHelper.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageErrorHandlerHelper.java new file mode 100644 index 00000000000..d0a5dbcf507 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageErrorHandlerHelper.java @@ -0,0 +1,78 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureSpan; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; + +public final class SpringMessageErrorHandlerHelper { + private static final ThreadLocal AWS_LISTENER_DEPTH = ThreadLocal.withInitial(() -> 0); + private static final ThreadLocal PENDING_CONTINUATION = + new ThreadLocal<>(); + + private SpringMessageErrorHandlerHelper() {} + + public static void enterAwsListenerInvocation() { + AWS_LISTENER_DEPTH.set(AWS_LISTENER_DEPTH.get() + 1); + } + + public static void exitAwsListenerInvocation() { + int depth = AWS_LISTENER_DEPTH.get() - 1; + if (depth <= 0) { + AWS_LISTENER_DEPTH.remove(); + } else { + AWS_LISTENER_DEPTH.set(depth); + } + } + + public static boolean isInAwsListenerInvocation() { + return AWS_LISTENER_DEPTH.get() > 0; + } + + public static void capturePendingContinuation(AgentSpan span) { + if (span == null) { + return; + } + AgentScope.Continuation existing = PENDING_CONTINUATION.get(); + if (existing != null) { + existing.cancel(); + } + PENDING_CONTINUATION.set(captureSpan(span)); + } + + public static void transferPendingContinuation(State state) { + AgentScope.Continuation continuation = PENDING_CONTINUATION.get(); + PENDING_CONTINUATION.remove(); + if (continuation == null) { + return; + } + if (state == null) { + continuation.cancel(); + return; + } + state.setOrCancelContinuation(continuation); + } + + public static void clearPendingContinuation() { + AgentScope.Continuation continuation = PENDING_CONTINUATION.get(); + PENDING_CONTINUATION.remove(); + if (continuation != null) { + continuation.cancel(); + } + } + + public static AgentScope activateContinuation(State state) { + if (state == null) { + return null; + } + AgentScope.Continuation continuation = state.getAndResetContinuation(); + return continuation != null ? continuation.activate() : null; + } + + public static void cancelContinuation(State state) { + if (state != null) { + state.closeContinuation(); + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java index 48538837dfe..e43e5756a4a 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java @@ -55,6 +55,7 @@ public String[] helperClassNames() { packageName + ".SpringMessageDecorator", packageName + ".SpringMessageExtractAdapter", packageName + ".SpringMessageExtractAdapter$1", + packageName + ".SpringMessageErrorHandlerHelper", }; } @@ -95,10 +96,13 @@ public static void onExit( return; } AgentSpan span = scope.span(); - scope.close(); if (null != error) { DECORATE.onError(span, error); + if (SpringMessageErrorHandlerHelper.isInAwsListenerInvocation()) { + SpringMessageErrorHandlerHelper.capturePendingContinuation(span); + } } + scope.close(); if (result != null) { Object wrappedResult = AsyncResultExtensions.wrapAsyncResult(result, result.getClass(), span); diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy index 45156d53802..00dff41d8c7 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy @@ -8,6 +8,8 @@ import datadog.trace.api.config.GeneralConfig import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.core.DDSpan import datadog.trace.instrumentation.aws.ExpectedQueryParams +import listener.ErrorHandlerConfig +import listener.ErrorHandlerObservation import io.awspring.cloud.sqs.operations.SqsTemplate import listener.Config import listener.TestListener @@ -198,6 +200,90 @@ class SpringListenerSQSTest extends InstrumentationSpecification { context.close() } + def "blocking error handler keeps spring.consume span active"() { + setup: + def context = new AnnotationConfigApplicationContext(ErrorHandlerConfig) + def address = context.getBean(SQSRestServer).waitUntilStarted().localAddress() + def observation = context.getBean(ErrorHandlerObservation) + def template = SqsTemplate.newTemplate(context.getBean(SqsAsyncClient)) + TEST_WRITER.waitForTraces(2) + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace("parent") { + template.sendAsync("SpringListenerSQSError", "boom").get() + } + observation.awaitBlockingErrorHandler() + + then: + assert observation.blockingError != null + assertErrorHandlerTrace( + address, + "SpringListenerSQSError", + "ErrorHandlerListener.observeFailure", + "ObservingErrorHandler.handle", + "listener failurea") + + cleanup: + context?.close() + } + + def "async error handler keeps spring.consume span active"() { + setup: + def context = new AnnotationConfigApplicationContext(ErrorHandlerConfig) + def address = context.getBean(SQSRestServer).waitUntilStarted().localAddress() + def observation = context.getBean(ErrorHandlerObservation) + def template = SqsTemplate.newTemplate(context.getBean(SqsAsyncClient)) + TEST_WRITER.waitForTraces(2) + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace("parent") { + template.sendAsync("SpringListenerSQSAsyncError", "boom").get() + } + observation.awaitAsyncErrorHandler() + + then: + assert observation.asyncError != null + assertErrorHandlerTrace( + address, + "SpringListenerSQSAsyncError", + "ErrorHandlerListener.observeAsyncFailure", + "ObservingAsyncErrorHandler.handle", + "async listener failure") + + cleanup: + context?.close() + } + + private void assertErrorHandlerTrace( + InetSocketAddress address, + String queueName, + String listenerResourceName, + String errorHandlerResourceName, + String errorMessage) { + def sendingSpan + assertTraces(4, SORT_TRACES_BY_START) { + sortSpansByStart() + trace(3) { + basicSpan(it, "parent") + getQueueUrl(it, address, span(0), queueName) + sendMessage(it, address, span(0), queueName) + sendingSpan = span(2) + } + trace(1) { + receiveMessage(it, address, sendingSpan, queueName) + } + trace(2) { + springErrorSqsListener(it, sendingSpan, listenerResourceName, errorMessage) + tracedErrorHandler(it, span(0), errorHandlerResourceName) + } + trace(1) { + deleteMessageBatch(it, address, queueName) + } + } + } + static sendMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan, String queueName = "SpringListenerSQS") { traceAssert.span { serviceName "sqs" @@ -295,6 +381,36 @@ class SpringListenerSQSTest extends InstrumentationSpecification { } } + static springErrorSqsListener( + TraceAssert traceAssert, DDSpan parentSpan, String expectedResourceName, String errorMessage) { + traceAssert.span { + serviceName "my-service" + operationName "spring.consume" + resourceName expectedResourceName + spanType DDSpanTypes.MESSAGE_CONSUMER + errored true + measured true + childOf(parentSpan) + tags { + "$Tags.COMPONENT" "spring-messaging" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + errorTags(RuntimeException, errorMessage) + defaultTags(true) + } + } + } + + static tracedErrorHandler( + TraceAssert traceAssert, DDSpan parentSpan, String expectedResourceName) { + traceAssert.span { + serviceName "my-service" + operationName "error.handler" + resourceName expectedResourceName + childOf(parentSpan) + errored false + } + } + static deleteMessageBatch(TraceAssert traceAssert, InetSocketAddress address, String queueName = "SpringListenerSQS") { traceAssert.span { serviceName "sqs" diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerConfig.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerConfig.java new file mode 100644 index 00000000000..16f153fc2d7 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerConfig.java @@ -0,0 +1,81 @@ +package listener; + +import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; +import jakarta.annotation.PreDestroy; +import java.net.URI; +import org.elasticmq.rest.sqs.SQSRestServer; +import org.elasticmq.rest.sqs.SQSRestServerBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +@Configuration +@Import(SqsBootstrapConfiguration.class) +public class ErrorHandlerConfig { + + @Bean + public SQSRestServer sqsRestServer() { + return SQSRestServerBuilder.withInterface("localhost").withDynamicPort().start(); + } + + @Bean + public SqsAsyncClient sqsAsyncClient() { + int port = sqsRestServer().waitUntilStarted().localAddress().getPort(); + return SqsAsyncClient.builder() + .credentialsProvider(AnonymousCredentialsProvider.create()) + .region(Region.AWS_GLOBAL) + .endpointOverride(URI.create("http://localhost:" + port)) + .build(); + } + + @Bean + public ErrorHandlerObservation errorHandlerObservation() { + return new ErrorHandlerObservation(); + } + + @Bean + public ErrorHandlerListener errorHandlerListener() { + return new ErrorHandlerListener(); + } + + @Bean + public ObservingErrorHandler observingErrorHandler(ErrorHandlerObservation observation) { + return new ObservingErrorHandler(observation); + } + + @Bean + public ObservingAsyncErrorHandler observingAsyncErrorHandler( + ErrorHandlerObservation observation) { + return new ObservingAsyncErrorHandler(observation); + } + + @Bean + public SqsMessageListenerContainerFactory sqsListenerContainerFactory( + SqsAsyncClient sqsAsyncClient, ObservingErrorHandler errorHandler) { + return SqsMessageListenerContainerFactory.builder() + .sqsAsyncClient(sqsAsyncClient) + .configure(options -> options.acknowledgementMode(AcknowledgementMode.ALWAYS)) + .errorHandler(errorHandler) + .build(); + } + + @Bean + public SqsMessageListenerContainerFactory sqsAsyncListenerContainerFactory( + SqsAsyncClient sqsAsyncClient, ObservingAsyncErrorHandler errorHandler) { + return SqsMessageListenerContainerFactory.builder() + .sqsAsyncClient(sqsAsyncClient) + .configure(options -> options.acknowledgementMode(AcknowledgementMode.ALWAYS)) + .errorHandler(errorHandler) + .build(); + } + + @PreDestroy + public void destroy() { + sqsRestServer().stopAndWait(); + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerListener.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerListener.java new file mode 100644 index 00000000000..5324e828a3b --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerListener.java @@ -0,0 +1,19 @@ +package listener; + +import io.awspring.cloud.sqs.annotation.SqsListener; +import java.util.concurrent.CompletableFuture; + +public class ErrorHandlerListener { + + @SqsListener(queueNames = "SpringListenerSQSError", factory = "sqsListenerContainerFactory") + public void observeFailure(String message) { + throw new RuntimeException("listener failure"); + } + + @SqsListener( + queueNames = "SpringListenerSQSAsyncError", + factory = "sqsAsyncListenerContainerFactory") + public CompletableFuture observeAsyncFailure(String message) { + throw new RuntimeException("async listener failure"); + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerObservation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerObservation.java new file mode 100644 index 00000000000..d143d9a91d2 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ErrorHandlerObservation.java @@ -0,0 +1,51 @@ +package listener; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ErrorHandlerObservation { + private static final long TIMEOUT_SECONDS = 15L; + + private volatile Throwable blockingError; + private volatile Throwable asyncError; + + private final CountDownLatch blockingErrorHandled = new CountDownLatch(1); + private final CountDownLatch asyncErrorHandled = new CountDownLatch(1); + + public Throwable getBlockingError() { + return blockingError; + } + + public Throwable getAsyncError() { + return asyncError; + } + + public void recordBlockingErrorHandler(Throwable error) { + blockingError = error; + blockingErrorHandled.countDown(); + } + + public void recordAsyncErrorHandler(Throwable error) { + asyncError = error; + asyncErrorHandled.countDown(); + } + + public void awaitBlockingErrorHandler() { + await(blockingErrorHandled, "blocking error handler"); + } + + public void awaitAsyncErrorHandler() { + await(asyncErrorHandled, "async error handler"); + } + + private static void await(CountDownLatch latch, String description) { + try { + if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + throw new AssertionError("timed out waiting for " + description); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted while waiting for " + description, e); + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ObservingAsyncErrorHandler.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ObservingAsyncErrorHandler.java new file mode 100644 index 00000000000..8b71c7be426 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ObservingAsyncErrorHandler.java @@ -0,0 +1,21 @@ +package listener; + +import datadog.trace.api.Trace; +import java.util.concurrent.CompletableFuture; +import org.springframework.messaging.Message; + +public class ObservingAsyncErrorHandler + implements io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler { + private final ErrorHandlerObservation observation; + + public ObservingAsyncErrorHandler(ErrorHandlerObservation observation) { + this.observation = observation; + } + + @Override + @Trace(operationName = "error.handler", resourceName = "ObservingAsyncErrorHandler.handle") + public CompletableFuture handle(Message message, Throwable t) { + observation.recordAsyncErrorHandler(t); + return CompletableFuture.completedFuture(null); + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ObservingErrorHandler.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ObservingErrorHandler.java new file mode 100644 index 00000000000..32c80c80a06 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/java/listener/ObservingErrorHandler.java @@ -0,0 +1,19 @@ +package listener; + +import datadog.trace.api.Trace; +import org.springframework.messaging.Message; + +public class ObservingErrorHandler + implements io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler { + private final ErrorHandlerObservation observation; + + public ObservingErrorHandler(ErrorHandlerObservation observation) { + this.observation = observation; + } + + @Override + @Trace(operationName = "error.handler", resourceName = "ObservingErrorHandler.handle") + public void handle(Message message, Throwable t) { + observation.recordBlockingErrorHandler(t); + } +}