Skip to content
This repository was archived by the owner on Jun 13, 2023. It is now read-only.

Commit c0fb31e

Browse files
authored
fix(kafka.py): avoid setting headers in Kafka V0/V1 (#323)
1 parent 3dda58c commit c0fb31e

2 files changed

Lines changed: 41 additions & 8 deletions

File tree

epsagon/modules/kafka.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@ def _wrapper(wrapped, instance, args, kwargs):
3232
"""KafkaProducer.send wrapper"""
3333
new_args, new_kwargs = _parse_args(*args, **kwargs)
3434

35-
# Adds epsagon header
36-
if not new_kwargs.get('headers'):
37-
new_kwargs['headers'] = []
38-
new_kwargs['headers'].append(
39-
(EPSAGON_HEADER, get_epsagon_http_trace_id().encode())
40-
)
35+
# Adds epsagon header only on Kafka record V2. V0/V1 don't support it
36+
# pylint: disable=protected-access
37+
if instance._max_usable_produce_magic() == 2:
38+
if not new_kwargs.get('headers'):
39+
new_kwargs['headers'] = []
40+
new_kwargs['headers'].append(
41+
(EPSAGON_HEADER, get_epsagon_http_trace_id().encode())
42+
)
4143

4244
return wrapper(KafkaEventFactory, wrapped, instance, new_args, new_kwargs)
4345

tests/events/test_kafka.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77

88
TEST_URL = 'https://example.test/'
99

10-
def test(*args, **kwargs):
10+
def record_mock(*args, **kwargs):
1111
return [{}, False, False]
1212

1313

1414
@mock.patch('epsagon.trace.TraceFactory.add_event')
1515
@mock.patch('kafka.producer.kafka.KafkaProducer._wait_on_metadata')
1616
@mock.patch('kafka.producer.kafka.KafkaProducer._partition')
17-
@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=test)
17+
@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=record_mock)
1818
def test_sanity(append_mock, partition_mock, wait_on_metadata_mock, add_event_mock):
1919
retval = 'success'
2020
body = {'test': 1}
@@ -49,3 +49,34 @@ def wrapped_function():
4949
epsagon.constants.EPSAGON_HEADER in
5050
event.resource['metadata']['messaging.headers']
5151
)
52+
53+
54+
@mock.patch('epsagon.trace.TraceFactory.add_event')
55+
@mock.patch('kafka.producer.kafka.KafkaProducer._wait_on_metadata')
56+
@mock.patch('kafka.producer.kafka.KafkaProducer._partition')
57+
@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=record_mock)
58+
def test_no_header_injection(append_mock, partition_mock, wait_on_metadata_mock, add_event_mock):
59+
# Verify header is not injected in older kafka api versions (V1)
60+
retval = 'success'
61+
body = {'test': 1}
62+
63+
@epsagon.wrappers.python_function.python_wrapper
64+
def wrapped_function():
65+
producer = KafkaProducer(
66+
bootstrap_servers=['host:10'],
67+
client_id='test_client_id',
68+
api_version=(0, 10, 0),
69+
value_serializer=lambda x: json.dumps(x).encode('ascii'),
70+
)
71+
response = producer.send('topic', body)
72+
return retval
73+
assert wrapped_function() == retval
74+
wait_on_metadata_mock.assert_called()
75+
partition_mock.assert_called()
76+
append_mock.assert_called()
77+
add_event_mock.assert_called()
78+
event = add_event_mock.call_args_list[0].args[0]
79+
assert (
80+
epsagon.constants.EPSAGON_HEADER not in
81+
event.resource['metadata']
82+
)

0 commit comments

Comments
 (0)