-
Notifications
You must be signed in to change notification settings - Fork 334
Extend spring.consume context through Spring Cloud AWS SQS error handlers #11257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| package datadog.trace.instrumentation.springmessaging; | ||
|
|
||
| import datadog.trace.bootstrap.ContextStore; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
| import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException; | ||
| import java.util.function.BiConsumer; | ||
|
|
||
| public final class SpringCloudAwsErrorHandlerHelper { | ||
| private SpringCloudAwsErrorHandlerHelper() {} | ||
|
|
||
| public static ListenerExecutionFailedException findListenerExecutionFailedException( | ||
| Throwable error) { | ||
| Throwable current = error; | ||
| while (current != null && !(current instanceof ListenerExecutionFailedException)) { | ||
| Throwable cause = current.getCause(); | ||
| if (cause == current) { | ||
| return null; | ||
| } | ||
| current = cause; | ||
| } | ||
| return (ListenerExecutionFailedException) current; | ||
| } | ||
|
|
||
| public static final class CleanupOnError implements BiConsumer<Object, Throwable> { | ||
| private final ContextStore<ListenerExecutionFailedException, State> contextStore; | ||
|
|
||
| public CleanupOnError(ContextStore<ListenerExecutionFailedException, State> contextStore) { | ||
| this.contextStore = contextStore; | ||
| } | ||
|
|
||
| @Override | ||
| public void accept(Object ignored, Throwable error) { | ||
| if (error == null) { | ||
| return; | ||
| } | ||
| ListenerExecutionFailedException listenerException = | ||
| findListenerExecutionFailedException(error); | ||
| if (listenerException != null) { | ||
| SpringMessageErrorHandlerHelper.cancelContinuation(contextStore.get(listenerException)); | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| package datadog.trace.instrumentation.springmessaging; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static java.util.Collections.singletonMap; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import datadog.trace.bootstrap.ContextStore; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentScope; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
| import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException; | ||
| import java.util.Map; | ||
| import net.bytebuddy.asm.Advice; | ||
|
|
||
| @AutoService(InstrumenterModule.class) | ||
| public final class SpringCloudAwsErrorHandlerInstrumentation extends InstrumenterModule.Tracing | ||
| implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| public SpringCloudAwsErrorHandlerInstrumentation() { | ||
| super("spring-messaging", "spring-messaging-4"); | ||
| } | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage"; | ||
| } | ||
|
|
||
| @Override | ||
| public String[] helperClassNames() { | ||
| return new String[] { | ||
| packageName + ".SpringMessageErrorHandlerHelper", | ||
| packageName + ".SpringCloudAwsErrorHandlerHelper" | ||
| }; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, String> contextStore() { | ||
| return singletonMap(ListenerExecutionFailedException.class.getName(), State.class.getName()); | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("handleError")) | ||
| .and(takesArguments(2)) | ||
| .and(takesArgument(1, Throwable.class)), | ||
| SpringCloudAwsErrorHandlerInstrumentation.class.getName() | ||
| + "$ActivateErrorHandlerContinuation"); | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("handleErrors")) | ||
| .and(takesArguments(2)) | ||
| .and(takesArgument(1, Throwable.class)), | ||
| SpringCloudAwsErrorHandlerInstrumentation.class.getName() | ||
| + "$ActivateErrorHandlerContinuation"); | ||
| transformer.applyAdvice( | ||
| isMethod().and(named("process")).and(takesArguments(2)), | ||
| SpringCloudAwsErrorHandlerInstrumentation.class.getName() + "$CleanupContinuation"); | ||
| transformer.applyAdvice( | ||
| isMethod().and(named("processMany")).and(takesArguments(2)), | ||
| SpringCloudAwsErrorHandlerInstrumentation.class.getName() + "$CleanupContinuation"); | ||
| } | ||
|
|
||
| public static class ActivateErrorHandlerContinuation { | ||
|
|
||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static AgentScope onEnter(@Advice.Argument(1) Throwable error) { | ||
| ListenerExecutionFailedException listenerException = | ||
| SpringCloudAwsErrorHandlerHelper.findListenerExecutionFailedException(error); | ||
| if (listenerException == null) { | ||
| return null; | ||
| } | ||
| ContextStore<ListenerExecutionFailedException, State> contextStore = | ||
| InstrumentationContext.get(ListenerExecutionFailedException.class, State.class); | ||
| State state = contextStore.get(listenerException); | ||
| return SpringMessageErrorHandlerHelper.activateContinuation(state); | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void onExit(@Advice.Enter AgentScope scope) { | ||
| if (scope != null) { | ||
| scope.close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static class CleanupContinuation { | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void onExit( | ||
| @Advice.Return(readOnly = false) java.util.concurrent.CompletableFuture<?> result) { | ||
| if (result != null) { | ||
| ContextStore<ListenerExecutionFailedException, State> contextStore = | ||
| InstrumentationContext.get(ListenerExecutionFailedException.class, State.class); | ||
| result = | ||
| result.whenComplete(new SpringCloudAwsErrorHandlerHelper.CleanupOnError(contextStore)); | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| package datadog.trace.instrumentation.springmessaging; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static java.util.Collections.singletonMap; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import datadog.trace.bootstrap.ContextStore; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
| import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException; | ||
| import java.util.Map; | ||
| import net.bytebuddy.asm.Advice; | ||
|
|
||
| @AutoService(InstrumenterModule.class) | ||
| public final class SpringCloudAwsListenerAdapterInstrumentation extends InstrumenterModule.Tracing | ||
| implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| public SpringCloudAwsListenerAdapterInstrumentation() { | ||
| super("spring-messaging", "spring-messaging-4"); | ||
| } | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter"; | ||
| } | ||
|
|
||
| @Override | ||
| public String[] helperClassNames() { | ||
| return new String[] {packageName + ".SpringMessageErrorHandlerHelper"}; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, String> contextStore() { | ||
| return singletonMap(ListenerExecutionFailedException.class.getName(), State.class.getName()); | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice( | ||
| isMethod().and(named("invokeHandler")).and(takesArguments(1)), | ||
| SpringCloudAwsListenerAdapterInstrumentation.class.getName() + "$InvokeHandlerAdvice"); | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("createListenerException")) | ||
| .and(takesArguments(2)) | ||
| .and(takesArgument(1, Throwable.class)), | ||
| SpringCloudAwsListenerAdapterInstrumentation.class.getName() | ||
| + "$CreateListenerExceptionAdvice"); | ||
| } | ||
|
|
||
| public static class InvokeHandlerAdvice { | ||
|
|
||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static void onEnter() { | ||
| SpringMessageErrorHandlerHelper.enterAwsListenerInvocation(); | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void onExit() { | ||
| SpringMessageErrorHandlerHelper.clearPendingContinuation(); | ||
| SpringMessageErrorHandlerHelper.exitAwsListenerInvocation(); | ||
| } | ||
| } | ||
|
|
||
| public static class CreateListenerExceptionAdvice { | ||
|
|
||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void onExit(@Advice.Return ListenerExecutionFailedException listenerException) { | ||
| ContextStore<ListenerExecutionFailedException, State> contextStore = | ||
| InstrumentationContext.get(ListenerExecutionFailedException.class, State.class); | ||
| State state = contextStore.putIfAbsent(listenerException, State.FACTORY); | ||
| SpringMessageErrorHandlerHelper.transferPendingContinuation(state); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| package datadog.trace.instrumentation.springmessaging; | ||
|
|
||
| import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureSpan; | ||
|
|
||
| import datadog.trace.bootstrap.instrumentation.api.AgentScope; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
|
|
||
| public final class SpringMessageErrorHandlerHelper { | ||
| private static final ThreadLocal<Integer> AWS_LISTENER_DEPTH = ThreadLocal.withInitial(() -> 0); | ||
| private static final ThreadLocal<AgentScope.Continuation> PENDING_CONTINUATION = | ||
| new ThreadLocal<>(); | ||
|
|
||
| private SpringMessageErrorHandlerHelper() {} | ||
|
|
||
| public static void enterAwsListenerInvocation() { | ||
| AWS_LISTENER_DEPTH.set(AWS_LISTENER_DEPTH.get() + 1); | ||
| } | ||
|
|
||
| public static void exitAwsListenerInvocation() { | ||
| int depth = AWS_LISTENER_DEPTH.get() - 1; | ||
| if (depth <= 0) { | ||
| AWS_LISTENER_DEPTH.remove(); | ||
| } else { | ||
| AWS_LISTENER_DEPTH.set(depth); | ||
| } | ||
| } | ||
|
|
||
| public static boolean isInAwsListenerInvocation() { | ||
| return AWS_LISTENER_DEPTH.get() > 0; | ||
| } | ||
|
|
||
| public static void capturePendingContinuation(AgentSpan span) { | ||
| if (span == null) { | ||
| return; | ||
| } | ||
| AgentScope.Continuation existing = PENDING_CONTINUATION.get(); | ||
| if (existing != null) { | ||
| existing.cancel(); | ||
| } | ||
| PENDING_CONTINUATION.set(captureSpan(span)); | ||
| } | ||
|
|
||
| public static void transferPendingContinuation(State state) { | ||
| AgentScope.Continuation continuation = PENDING_CONTINUATION.get(); | ||
| PENDING_CONTINUATION.remove(); | ||
| if (continuation == null) { | ||
| return; | ||
| } | ||
| if (state == null) { | ||
| continuation.cancel(); | ||
| return; | ||
| } | ||
| state.setOrCancelContinuation(continuation); | ||
| } | ||
|
|
||
| public static void clearPendingContinuation() { | ||
| AgentScope.Continuation continuation = PENDING_CONTINUATION.get(); | ||
| PENDING_CONTINUATION.remove(); | ||
| if (continuation != null) { | ||
| continuation.cancel(); | ||
| } | ||
| } | ||
|
|
||
| public static AgentScope activateContinuation(State state) { | ||
| if (state == null) { | ||
| return null; | ||
| } | ||
| AgentScope.Continuation continuation = state.getAndResetContinuation(); | ||
| return continuation != null ? continuation.activate() : null; | ||
| } | ||
|
|
||
| public static void cancelContinuation(State state) { | ||
| if (state != null) { | ||
| state.closeContinuation(); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,6 +55,7 @@ public String[] helperClassNames() { | |
| packageName + ".SpringMessageDecorator", | ||
| packageName + ".SpringMessageExtractAdapter", | ||
| packageName + ".SpringMessageExtractAdapter$1", | ||
| packageName + ".SpringMessageErrorHandlerHelper", | ||
| }; | ||
| } | ||
|
|
||
|
|
@@ -95,10 +96,13 @@ public static void onExit( | |
| return; | ||
| } | ||
| AgentSpan span = scope.span(); | ||
| scope.close(); | ||
| if (null != error) { | ||
| DECORATE.onError(span, error); | ||
| if (SpringMessageErrorHandlerHelper.isInAwsListenerInvocation()) { | ||
| SpringMessageErrorHandlerHelper.capturePendingContinuation(span); | ||
| } | ||
|
Comment on lines
99
to
+103
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an Useful? React with 👍 / 👎. |
||
| } | ||
| scope.close(); | ||
| if (result != null) { | ||
| Object wrappedResult = | ||
| AsyncResultExtensions.wrapAsyncResult(result, result.getClass(), span); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In applications that use Spring Messaging/Spring Kafka but do not include
spring-cloud-aws-sqs, adding this AWS dependency to the onlyspring-messagingmuzzle directive makes the whole instrumentation safe only when AWS SQS is also present. The existingInvocableHandlerMethod/Kafka messaging instrumentation in this same module has nomuzzleDirective()override, and the project docs note module directives are checked against all instrumentations by default, so non-AWS Spring Messaging apps can lose their existingspring.consumeinstrumentation instead of only skipping the new AWS-specific instrumenters. Split the AWS instrumenters behind their own directive/module or overridemuzzleDirective()so the base Spring Messaging instrumentation remains loadable without AWS.Useful? React with 👍 / 👎.