Skip to content
This repository was archived by the owner on Jun 13, 2023. It is now read-only.

Commit 3dda58c

Browse files
authored
feat(kafka.py): add support for kafka-python producer (#321)
1 parent b185c82 commit 3dda58c

7 files changed

Lines changed: 230 additions & 3 deletions

File tree

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,8 +461,9 @@ Epsagon provides out-of-the-box instrumentation (tracing) for many popular frame
461461
|azure.cosmos |`>=4.0.0` |
462462
|celery |`>=4.0.0` |
463463
|grpc |`>=0.3-10` |
464-
|greengrasssdk |`>=1.4.0` |
465-
|SQLAlchemy |`>=1.2.0` |
464+
|greengrasssdk |`>=1.4.0` |
465+
|SQLAlchemy |`>=1.2.0` |
466+
|kafka-python |`>=1.4.0` |
466467

467468

468469

epsagon/events/kafka.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
"""
2+
kafka-python events.
3+
"""
4+
5+
from __future__ import absolute_import
6+
import traceback
7+
from uuid import uuid4
8+
9+
from epsagon.utils import add_data_if_needed
10+
from ..trace import trace_factory
11+
from ..event import BaseEvent
12+
from ..constants import EPSAGON_HEADER
13+
14+
15+
class KafkaEvent(BaseEvent):
16+
"""
17+
Represents base Kafka event.
18+
"""
19+
20+
ORIGIN = 'kafka'
21+
RESOURCE_TYPE = 'kafka'
22+
23+
# pylint: disable=W0613
24+
def __init__(self, wrapped, instance, args, kwargs, start_time, response,
25+
exception):
26+
"""
27+
Initialize.
28+
:param wrapped: wrapt's wrapped
29+
:param instance: wrapt's instance
30+
:param args: wrapt's args
31+
:param kwargs: wrapt's kwargs
32+
:param start_time: Start timestamp (epoch)
33+
:param response: response data
34+
:param exception: Exception (if happened)
35+
"""
36+
super(KafkaEvent, self).__init__(start_time)
37+
self.event_id = 'kafka-{}'.format(str(uuid4()))
38+
39+
topic = args[0]
40+
headers = dict(kwargs['headers'])
41+
servers = instance.config['bootstrap_servers']
42+
if servers and isinstance(servers, list):
43+
# Take the first server if it is a list
44+
servers = servers[0]
45+
46+
self.resource['name'] = topic
47+
self.resource['operation'] = 'send'
48+
self.resource['metadata'] = {
49+
'messaging.system': 'kafka',
50+
'messaging.destination': topic,
51+
'messaging.url': servers,
52+
'messaging.message_payload_size_bytes': (
53+
len(str(kwargs.get('value', '')))
54+
),
55+
}
56+
if instance.config.get('client_id'):
57+
self.resource['metadata']['messaging.kafka.client_id'] = (
58+
instance.config['client_id']
59+
)
60+
if headers.get(EPSAGON_HEADER):
61+
self.resource['metadata'][EPSAGON_HEADER] = (
62+
headers[EPSAGON_HEADER]
63+
)
64+
if kwargs['key']:
65+
self.resource['metadata']['messaging.kafka.message_key'] = (
66+
kwargs['key']
67+
)
68+
69+
add_data_if_needed(
70+
self.resource['metadata'],
71+
'messaging.headers',
72+
headers
73+
)
74+
75+
add_data_if_needed(
76+
self.resource['metadata'],
77+
'messaging.message',
78+
kwargs['value']
79+
)
80+
81+
if getattr(response, 'value', None) is not None:
82+
self.update_response(response.value)
83+
84+
if exception is not None:
85+
self.set_exception(exception, traceback.format_exc())
86+
87+
def update_response(self, response):
88+
"""
89+
Adds response data to event.
90+
:param response: Response from botocore
91+
:return: None
92+
"""
93+
self.resource['metadata']['messaging.kafka.partition'] = (
94+
response.partition
95+
)
96+
97+
98+
class KafkaEventFactory(object):
99+
"""
100+
Factory class, generates a kafka event.
101+
"""
102+
103+
@staticmethod
104+
def create_event(wrapped, instance, args, kwargs, start_time, response,
105+
exception):
106+
"""Create an event"""
107+
event = KafkaEvent(
108+
wrapped,
109+
instance,
110+
args,
111+
kwargs,
112+
start_time,
113+
response,
114+
exception
115+
)
116+
117+
trace_factory.add_event(event)

epsagon/modules/kafka.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""
2+
kafka-python patcher module.
3+
"""
4+
5+
from __future__ import absolute_import
6+
import wrapt
7+
from epsagon.modules.general_wrapper import wrapper
8+
from ..events.kafka import KafkaEventFactory
9+
from ..constants import EPSAGON_HEADER
10+
from ..utils import get_epsagon_http_trace_id
11+
12+
13+
def _parse_args(
14+
topic,
15+
value=None,
16+
key=None,
17+
headers=None,
18+
partition=None,
19+
timestamp_ms=None
20+
):
21+
"""Sort and return args and kwargs according to the original signature"""
22+
return (topic, ), {
23+
'value': value,
24+
'key': key,
25+
'headers': headers,
26+
'partition': partition,
27+
'timestamp_ms': timestamp_ms
28+
}
29+
30+
31+
def _wrapper(wrapped, instance, args, kwargs):
32+
"""KafkaProducer.send wrapper"""
33+
new_args, new_kwargs = _parse_args(*args, **kwargs)
34+
35+
# Adds epsagon header
36+
if not new_kwargs.get('headers'):
37+
new_kwargs['headers'] = []
38+
new_kwargs['headers'].append(
39+
(EPSAGON_HEADER, get_epsagon_http_trace_id().encode())
40+
)
41+
42+
return wrapper(KafkaEventFactory, wrapped, instance, new_args, new_kwargs)
43+
44+
45+
def patch():
46+
"""
47+
patch module.
48+
:return: None
49+
"""
50+
wrapt.wrap_function_wrapper(
51+
'kafka.producer.kafka',
52+
'KafkaProducer.send',
53+
_wrapper
54+
)

epsagon/trace.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ def _decimal_serializer(o):
5353
""" Taken from `bootstrap.py` of AWS Lambda Python runtime """
5454
if isinstance(o, decimal.Decimal):
5555
return _number_str(o)
56+
if isinstance(o, bytes):
57+
return o.decode('utf-8')
5658
raise TypeError(repr(o) + ' is not JSON serializable')
5759

5860

@@ -552,7 +554,6 @@ def send_traces(self, trace=None):
552554
if not trace_sent:
553555
self.pop_trace(trace=trace)
554556

555-
556557
def prepare(self):
557558
"""
558559
Prepare the relevant trace.

epsagon/trace_encoder.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ def default(self, o): # pylint: disable=method-hidden
1414
return list(o)
1515
if isinstance(o, (datetime, date)):
1616
return o.isoformat()
17+
if isinstance(o, bytes):
18+
return o.decode('utf-8')
1719

1820
output = repr(o)
1921
try:

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ httpx; python_version >= '3.5'
2020
asynctest; python_version >= '3.5'
2121
moto
2222
tornado
23+
kafka-python

tests/events/test_kafka.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import json
2+
import epsagon.wrappers.python_function
3+
import epsagon.runners.python_function
4+
import epsagon.constants
5+
import mock
6+
from kafka import KafkaProducer
7+
8+
TEST_URL = 'https://example.test/'
9+
10+
def test(*args, **kwargs):
11+
return [{}, False, False]
12+
13+
14+
@mock.patch('epsagon.trace.TraceFactory.add_event')
15+
@mock.patch('kafka.producer.kafka.KafkaProducer._wait_on_metadata')
16+
@mock.patch('kafka.producer.kafka.KafkaProducer._partition')
17+
@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=test)
18+
def test_sanity(append_mock, partition_mock, wait_on_metadata_mock, add_event_mock):
19+
retval = 'success'
20+
body = {'test': 1}
21+
22+
@epsagon.wrappers.python_function.python_wrapper
23+
def wrapped_function():
24+
producer = KafkaProducer(
25+
bootstrap_servers=['host:10'],
26+
client_id='test_client_id',
27+
api_version=(0, 11, 5),
28+
value_serializer=lambda x: json.dumps(x).encode('ascii'),
29+
)
30+
response = producer.send('topic', body, headers=[('content-encoding', b'base64')])
31+
return retval
32+
assert wrapped_function() == retval
33+
wait_on_metadata_mock.assert_called()
34+
partition_mock.assert_called()
35+
append_mock.assert_called()
36+
add_event_mock.assert_called()
37+
event = add_event_mock.call_args_list[0].args[0]
38+
assert event.resource['name'] == 'topic'
39+
assert event.resource['operation'] == 'send'
40+
assert event.resource['type'] == 'kafka'
41+
assert event.resource['metadata']['messaging.kafka.client_id'] == (
42+
'test_client_id'
43+
)
44+
assert event.resource['metadata'][
45+
'messaging.message_payload_size_bytes'
46+
] == len(str(body))
47+
assert event.resource['metadata']['messaging.message'] == body
48+
assert (
49+
epsagon.constants.EPSAGON_HEADER in
50+
event.resource['metadata']['messaging.headers']
51+
)

0 commit comments

Comments
 (0)