diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py index ccda468b00..01f000f33d 100644 --- a/sentry_sdk/integrations/dramatiq.py +++ b/sentry_sdk/integrations/dramatiq.py @@ -6,11 +6,13 @@ from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.integrations._wsgi_common import request_body_within_bounds +from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, TransactionSource, ) +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( AnnotatedValue, capture_internal_exceptions, @@ -114,7 +116,8 @@ def before_enqueue( } def before_process_message(self, broker: "Broker", message: "Message[R]") -> None: - integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(DramatiqIntegration) if integration is None: return @@ -129,21 +132,35 @@ def before_process_message(self, broker: "Broker", message: "Message[R]") -> Non # start new trace in case of retrying sentry_headers = {} - transaction = continue_trace( - sentry_headers, - name=message.actor_name, - op=OP.QUEUE_TASK_DRAMATIQ, - source=TransactionSource.TASK, - origin=DramatiqIntegration.origin, - ) - transaction.set_status(SPANSTATUS.OK) - sentry_sdk.start_transaction( - transaction, - name=message.actor_name, - op=OP.QUEUE_TASK_DRAMATIQ, - source=TransactionSource.TASK, - ) - transaction.__enter__() + if has_span_streaming_enabled(client.options): + sentry_sdk.traces.continue_trace(sentry_headers) + span = sentry_sdk.traces.start_span( + name=message.actor_name, + attributes={ + "sentry.op": OP.QUEUE_TASK_DRAMATIQ, + "sentry.origin": DramatiqIntegration.origin, + "sentry.span.source": SegmentSource.TASK.value, + }, + ) + message._sentry_span_ctx = span + span.__enter__() + else: + transaction = continue_trace( + sentry_headers, + name=message.actor_name, + op=OP.QUEUE_TASK_DRAMATIQ, + source=TransactionSource.TASK, + origin=DramatiqIntegration.origin, + ) + transaction.set_status(SPANSTATUS.OK) + sentry_sdk.start_transaction( + transaction, + name=message.actor_name, + op=OP.QUEUE_TASK_DRAMATIQ, + source=TransactionSource.TASK, + ) + transaction.__enter__() + message._sentry_span_ctx = transaction def after_process_message( self, @@ -161,8 +178,8 @@ def after_process_message( throws = message.options.get("throws") or actor.options.get("throws") scope_manager = message._scope_manager - transaction = sentry_sdk.get_current_scope().transaction - if not transaction: + span_ctx = getattr(message, "_sentry_span_ctx", None) + if span_ctx is None: return None is_event_capture_required = ( @@ -172,7 +189,7 @@ def after_process_message( ) if not is_event_capture_required: # normal transaction finish - transaction.__exit__(None, None, None) + span_ctx.__exit__(None, None, None) scope_manager.__exit__(None, None, None) return @@ -186,7 +203,7 @@ def after_process_message( ) sentry_sdk.capture_event(event, hint=hint) # transaction error - transaction.__exit__(type(exception), exception, None) + span_ctx.__exit__(type(exception), exception, None) scope_manager.__exit__(type(exception), exception, None) after_skip_message = after_process_message diff --git a/tests/integrations/dramatiq/test_dramatiq.py b/tests/integrations/dramatiq/test_dramatiq.py index a9d3966839..f75b97a592 100644 --- a/tests/integrations/dramatiq/test_dramatiq.py +++ b/tests/integrations/dramatiq/test_dramatiq.py @@ -17,10 +17,14 @@ @pytest.fixture(scope="function") def broker(request, sentry_init): - sentry_init( - integrations=[DramatiqIntegration()], - traces_sample_rate=getattr(request, "param", None), - ) + param = getattr(request, "param", None) + if isinstance(param, dict): + sentry_init(integrations=[DramatiqIntegration()], **param) + else: + sentry_init( + integrations=[DramatiqIntegration()], + traces_sample_rate=param, + ) broker = StubBroker() broker.emit_after("process_boot") dramatiq.set_broker(broker) @@ -66,22 +70,77 @@ def dummy_actor(x, y): @pytest.mark.parametrize( - "broker,expected_span_status,fail_fast", + "broker,expected_span_status,fail_fast,span_streaming", [ - (1.0, SPANSTATUS.INTERNAL_ERROR, False), - (1.0, SPANSTATUS.OK, False), - (1.0, SPANSTATUS.INTERNAL_ERROR, True), - (1.0, SPANSTATUS.OK, True), + ({"traces_sample_rate": 1.0}, SPANSTATUS.INTERNAL_ERROR, False, False), + ({"traces_sample_rate": 1.0}, SPANSTATUS.OK, False, False), + ({"traces_sample_rate": 1.0}, SPANSTATUS.INTERNAL_ERROR, True, False), + ({"traces_sample_rate": 1.0}, SPANSTATUS.OK, True, False), + ( + { + "traces_sample_rate": 1.0, + "_experiments": {"trace_lifecycle": "stream"}, + }, + SPANSTATUS.INTERNAL_ERROR, + False, + True, + ), + ( + { + "traces_sample_rate": 1.0, + "_experiments": {"trace_lifecycle": "stream"}, + }, + SPANSTATUS.OK, + False, + True, + ), + ( + { + "traces_sample_rate": 1.0, + "_experiments": {"trace_lifecycle": "stream"}, + }, + SPANSTATUS.INTERNAL_ERROR, + True, + True, + ), + ( + { + "traces_sample_rate": 1.0, + "_experiments": {"trace_lifecycle": "stream"}, + }, + SPANSTATUS.OK, + True, + True, + ), + ], + ids=[ + "error", + "success", + "error_fail_fast", + "success_fail_fast", + "error_stream", + "success_stream", + "error_fail_fast_stream", + "success_fail_fast_stream", ], - ids=["error", "success", "error_fail_fast", "success_fail_fast"], indirect=["broker"], ) def test_task_transaction( - broker, worker, capture_events, expected_span_status, fail_fast + broker, + worker, + capture_events, + capture_items, + expected_span_status, + fail_fast, + span_streaming, ): - events = capture_events() task_fails = expected_span_status == SPANSTATUS.INTERNAL_ERROR + if span_streaming: + items = capture_items("event", "span") + else: + events = capture_events() + @dramatiq.actor(max_retries=0) def dummy_actor(x, y): return x / y @@ -95,37 +154,93 @@ def dummy_actor(x, y): broker.join(dummy_actor.queue_name, fail_fast=fail_fast) worker.join() + sentry_sdk.flush() + + if span_streaming: + if task_fails: + error_item, segment_item = items + error_event = error_item.payload + exception = error_event["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == DramatiqIntegration.identifier + else: + (segment_item,) = items + + segment = segment_item.payload + assert segment_item.type == "span" + assert segment["name"] == "dummy_actor" + assert segment["is_segment"] is True + assert segment["attributes"]["sentry.op"] == "queue.task.dramatiq" + assert segment["attributes"]["sentry.span.source"] == "task" + assert segment["status"] == ("error" if task_fails else "ok") + else: + if task_fails: + error_event = events.pop(0) + exception = error_event["exception"]["values"][0] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == DramatiqIntegration.identifier - if task_fails: - error_event = events.pop(0) - exception = error_event["exception"]["values"][0] - assert exception["type"] == "ZeroDivisionError" - assert exception["mechanism"]["type"] == DramatiqIntegration.identifier + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "dummy_actor" + assert event["transaction_info"] == {"source": TransactionSource.TASK} + assert event["contexts"]["trace"]["status"] == expected_span_status - (event,) = events - assert event["type"] == "transaction" - assert event["transaction"] == "dummy_actor" - assert event["transaction_info"] == {"source": TransactionSource.TASK} - assert event["contexts"]["trace"]["status"] == expected_span_status +@pytest.mark.parametrize( + "broker,span_streaming", + [ + ({"traces_sample_rate": 1.0}, False), + ( + { + "traces_sample_rate": 1.0, + "_experiments": {"trace_lifecycle": "stream"}, + }, + True, + ), + ], + ids=["static", "stream"], + indirect=["broker"], +) +def test_dramatiq_propagate_trace( + broker, worker, capture_events, capture_items, span_streaming +): + if span_streaming: + items = capture_items("span") -@pytest.mark.parametrize("broker", [1.0], indirect=True) -def test_dramatiq_propagate_trace(broker, worker, capture_events): - events = capture_events() + with sentry_sdk.traces.start_span(name="outer") as outer_span: - @dramatiq.actor(max_retries=0) - def propagated_trace_task(): - pass + @dramatiq.actor(max_retries=0) + def propagated_trace_task(): + pass - with start_transaction() as outer_transaction: - propagated_trace_task.send() - broker.join(propagated_trace_task.queue_name) - worker.join() + propagated_trace_task.send() + broker.join(propagated_trace_task.queue_name) + worker.join() - assert ( - events[0]["transaction"] == "propagated_trace_task" - ) # the "inner" transaction - assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id + sentry_sdk.flush() + + inner_segment, outer_segment = [i.payload for i in items] + assert inner_segment["name"] == "propagated_trace_task" + assert inner_segment["attributes"]["sentry.op"] == "queue.task.dramatiq" + assert inner_segment["trace_id"] == outer_span.trace_id + assert outer_segment["name"] == "outer" + else: + events = capture_events() + + @dramatiq.actor(max_retries=0) + def propagated_trace_task(): + pass + + with start_transaction() as outer_transaction: + propagated_trace_task.send() + broker.join(propagated_trace_task.queue_name) + worker.join() + + assert ( + events[0]["transaction"] == "propagated_trace_task" + ) # the "inner" transaction + assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id @pytest.mark.parametrize( @@ -389,19 +504,39 @@ def dummy_actor(): assert events == [] -@pytest.mark.parametrize("broker", [1.0], indirect=True) +@pytest.mark.parametrize( + "broker,span_streaming", + [ + ({"traces_sample_rate": 1.0}, False), + ( + { + "traces_sample_rate": 1.0, + "_experiments": {"trace_lifecycle": "stream"}, + }, + True, + ), + ], + ids=["static", "stream"], + indirect=["broker"], +) def test_that_skip_message_cleans_up_scope_and_transaction( - broker, worker, capture_events + broker, worker, capture_events, capture_items, span_streaming ): - transactions: list[Transaction] = [] + captured_spans: list = [] class SkipMessageMiddleware(Middleware): def before_process_message(self, broker, message): - transactions.append(sentry_sdk.get_current_scope().transaction) + if span_streaming: + captured_spans.append(sentry_sdk.get_current_span()) + else: + captured_spans.append(sentry_sdk.get_current_scope().transaction) raise SkipMessage() broker.add_middleware(SkipMessageMiddleware()) + if span_streaming: + items = capture_items("span") + @dramatiq.actor(max_retries=0) def skipped_actor(): ... @@ -410,5 +545,12 @@ def skipped_actor(): ... broker.join(skipped_actor.queue_name) worker.join() - (transaction,) = transactions - assert transaction.timestamp is not None + if span_streaming: + sentry_sdk.flush() + (segment_payload,) = [i.payload for i in items] + assert segment_payload["name"] == "skipped_actor" + assert segment_payload["end_timestamp"] is not None + else: + (transaction,) = captured_spans + assert isinstance(transaction, Transaction) + assert transaction.timestamp is not None