Skip to content

Commit 47201a6

Browse files
nightcitybladenightcityblade
andauthored
fix: support aiokafka 0.13.0 which removed api_version parameter (#674)
aiokafka 0.13.0 removed the `api_version` parameter from both `AIOKafkaConsumer` and `AIOKafkaProducer`. Since faust requires `aiokafka>=0.10.0`, it needs to handle both old and new versions. This adds a version check and conditionally includes `api_version` only when running with aiokafka < 0.13.0. Fixes #672 Co-authored-by: nightcityblade <[email protected]>
1 parent ff75c0b commit 47201a6

2 files changed

Lines changed: 27 additions & 8 deletions

File tree

faust/transport/drivers/aiokafka.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import aiokafka
2828
import aiokafka.abc
2929
import opentracing
30+
from packaging.version import Version
31+
32+
_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0")
3033
from aiokafka import TopicPartition
3134
from aiokafka.consumer.group_coordinator import OffsetCommitRequest
3235
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
@@ -529,8 +532,11 @@ def _create_worker_consumer(
529532
f"broker_request_timeout={request_timeout}"
530533
)
531534

535+
consumer_kwargs: dict[str, Any] = {}
536+
if _AIOKAFKA_HAS_API_VERSION:
537+
consumer_kwargs["api_version"] = conf.consumer_api_version
532538
return aiokafka.AIOKafkaConsumer(
533-
api_version=conf.consumer_api_version,
539+
**consumer_kwargs,
534540
client_id=conf.broker_client_id,
535541
group_id=conf.id,
536542
group_instance_id=conf.consumer_group_instance_id,
@@ -1111,7 +1117,7 @@ def __post_init__(self) -> None:
11111117

11121118
def _settings_default(self) -> Mapping[str, Any]:
11131119
transport = cast(Transport, self.transport)
1114-
return {
1120+
settings: dict[str, Any] = {
11151121
"bootstrap_servers": server_list(transport.url, transport.default_port),
11161122
"client_id": self.client_id,
11171123
"acks": self.acks,
@@ -1122,10 +1128,12 @@ def _settings_default(self) -> Mapping[str, Any]:
11221128
"security_protocol": "SSL" if self.ssl_context else "PLAINTEXT",
11231129
"partitioner": self.partitioner,
11241130
"request_timeout_ms": int(self.request_timeout * 1000),
1125-
"api_version": self._api_version,
11261131
"metadata_max_age_ms": self.app.conf.producer_metadata_max_age_ms,
11271132
"connections_max_idle_ms": self.app.conf.producer_connections_max_idle_ms,
11281133
}
1134+
if _AIOKAFKA_HAS_API_VERSION:
1135+
settings["api_version"] = self._api_version
1136+
return settings
11291137

11301138
def _settings_auth(self) -> Mapping[str, Any]:
11311139
return credentials_to_aiokafka_auth(self.credentials, self.ssl_context)

tests/unit/transport/drivers/test_aiokafka.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import faust
1818
from faust import auth
1919
from faust.exceptions import ImproperlyConfigured, NotReady
20+
from faust.transport.drivers.aiokafka import _AIOKAFKA_HAS_API_VERSION
2021
from faust.sensors.monitor import Monitor
2122
from faust.transport.drivers import aiokafka as mod
2223
from faust.transport.drivers.aiokafka import (
@@ -813,8 +814,7 @@ def assert_create_worker_consumer(
813814
c = cthread._create_worker_consumer(transport)
814815
assert c is AIOKafkaConsumer.return_value
815816
max_poll_interval = conf.broker_max_poll_interval
816-
AIOKafkaConsumer.assert_called_once_with(
817-
api_version=app.conf.consumer_api_version,
817+
expected_kwargs = dict(
818818
client_id=conf.broker_client_id,
819819
group_id=conf.id,
820820
group_instance_id=conf.consumer_group_instance_id,
@@ -841,6 +841,9 @@ def assert_create_worker_consumer(
841841
# flush_spans=cthread.flush_spans,
842842
**auth_settings,
843843
)
844+
if _AIOKAFKA_HAS_API_VERSION:
845+
expected_kwargs["api_version"] = app.conf.consumer_api_version
846+
AIOKafkaConsumer.assert_called_once_with(**expected_kwargs)
844847

845848
def test__create_client_consumer(self, *, cthread, app):
846849
transport = cthread.transport
@@ -1382,7 +1385,7 @@ def assert_new_producer(
13821385
with patch("aiokafka.AIOKafkaProducer") as AIOKafkaProducer:
13831386
p = producer._new_producer()
13841387
assert p is AIOKafkaProducer.return_value
1385-
AIOKafkaProducer.assert_called_once_with(
1388+
expected_kwargs = dict(
13861389
bootstrap_servers=bootstrap_servers,
13871390
client_id=client_id,
13881391
acks=acks,
@@ -1393,12 +1396,14 @@ def assert_new_producer(
13931396
security_protocol=security_protocol,
13941397
partitioner=producer.partitioner,
13951398
transactional_id=None,
1396-
api_version=api_version,
13971399
metadata_max_age_ms=metadata_max_age_ms,
13981400
connections_max_idle_ms=connections_max_idle_ms,
13991401
request_timeout_ms=request_timeout_ms,
14001402
**kwargs,
14011403
)
1404+
if _AIOKAFKA_HAS_API_VERSION:
1405+
expected_kwargs["api_version"] = api_version
1406+
AIOKafkaProducer.assert_called_once_with(**expected_kwargs)
14021407

14031408

14041409
class TestProducer(ProducerBaseTest):
@@ -1475,7 +1480,13 @@ def test__new_producer(self, *, app):
14751480
[
14761481
pytest.param(
14771482
{"api_version": "auto"},
1478-
marks=pytest.mark.conf(producer_api_version="auto"),
1483+
marks=[
1484+
pytest.mark.conf(producer_api_version="auto"),
1485+
pytest.mark.skipif(
1486+
not _AIOKAFKA_HAS_API_VERSION,
1487+
reason="api_version removed in aiokafka>=0.13.0",
1488+
),
1489+
],
14791490
),
14801491
pytest.param({"acks": -1}, marks=pytest.mark.conf(producer_acks="all")),
14811492
pytest.param(

0 commit comments

Comments
 (0)