Fix data race in plugins#248
Merged
Merged
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses multiple concurrency/data-race issues across several plugins and the core tracing/reporter implementation, primarily by isolating async callback tracing, making completion paths one-shot/atomic, and adding new tracing support for batch consumers.
Changes:
- Refactors RocketMQ/Pulsar async producer callbacks to record results on isolated local spans (and adds regression tests for nil-result / panic isolation).
- Fixes RocketMQ batch consumer span reporting by creating a single entry span per batch and introducing a new
ExtractContextAPI to attach additional upstream references. - Hardens several other plugins and reporters against races/panics (gRPC streaming finish flag, go-micro socket close, MongoDB async completion, AMQP map locking, mux writer wrapper, Kafka/gRPC reporter goroutine recover), with new tests.
Reviewed changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| plugins/rocketmq/producer/async_producer.go | Isolates async send callback tracing into a local-span helper with recover and nil-safety. |
| plugins/rocketmq/producer/async_producer_test.go | Adds tests for async callback failure/success paths and panic isolation. |
| plugins/rocketmq/consumer/consumer.go | Creates one entry span per batch and extracts extra refs for remaining messages. |
| plugins/rocketmq/consumer/consumer_test.go | Tests single-span batch semantics and empty-batch no-op. |
| plugins/pulsar/pulsar/send_async_producer.go | Mirrors RocketMQ async callback isolation for Pulsar producer callbacks. |
| plugins/pulsar/pulsar/send_async_producer_test.go | Adds async callback tests for nil ID and tag correctness. |
| plugins/mux/serve_interceptor.go | Fixes default wrapper path to wrap the real ResponseWriter (avoids nil wrapper). |
| plugins/mux/serve_interceptor_test.go | Adds regression tests for non-hijacker writers and interceptor span tagging. |
| plugins/mongo/mongo/interceptor.go | Uses async span lifecycle to finish spans safely when events fire on different goroutines. |
| plugins/mongo/mongo/interceptor_test.go | Tests cross-goroutine completion and exactly-once reporting behavior. |
| plugins/microv4/util/socket/close_interceptor.go | Makes connection-span AsyncFinish one-shot under concurrent Close calls. |
| plugins/microv4/util/socket/close_interceptor_test.go | Adds concurrency tests ensuring finish happens once and field is cleared. |
| plugins/microv4/util/socket/accept_interceptor.go | Extends injected data with sync.Once for one-shot Close completion. |
| plugins/microv4/server/structure.go | Mirrors InjectData structure to stay compatible with ref_generate requirements. |
| plugins/grpc/client_streaming_interceptor.go | Changes finish interception flag to atomic.Bool to eliminate races. |
| plugins/grpc/client_recvmsg_interceptor.go | Arms finish flag via atomic store when RecvMsg runs. |
| plugins/grpc/client_finish_interceptor.go | Consumes finish flag via CAS and ensures async finish is one-shot. |
| plugins/grpc/client_finish_interceptor_test.go | Adds tests for concurrent Finish calls and unarmed no-op behavior. |
| plugins/gorm/entry/callback.go | Stores spans per-statement via InstanceSet/Get and warns on leftover spans. |
| plugins/gorm/entry/callback_test.go | Tests statement-scoped storage, leftover warnings, and clone non-inheritance. |
| plugins/core/tracing/api.go | Adds public ExtractContext API for batch consumer reference attachment. |
| plugins/core/tracing.go | Implements Tracer.ExtractContext to append refs and merge correlation. |
| plugins/core/tracing_extract_test.go | Tests ref-append behavior and entry-span-only no-op semantics. |
| plugins/core/span_default.go | Adds locked appendRef and documents Refs locking/freeze behavior. |
| plugins/core/reporter/kafka/kafka.go | Wraps instance-check loop iterations with recover and factors into checkOnce. |
| plugins/core/reporter/grpc/grpc.go | Wraps profile-task polling iterations with recover and factors into fetchProfileTasksOnce. |
| plugins/core/operator/tracing.go | Extends tracing operator interface to include ExtractContext. |
| plugins/amqp/general_consumer.go | Protects global consumerTag→queue map with RWMutex + accessors. |
| plugins/amqp/general_consumer_test.go | Adds concurrency and basic accessor semantics tests. |
| CHANGES.md | Documents the fixes and newly added API behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
wu-sheng
approved these changes
Jun 6, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.