Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
import unittest
from __future__ import annotations
from unittest.mock import patch, MagicMock
import uuid
import types

# Standard library imports
from typing import Optional, Callable

# Domain imports
from a2a.types import Message, TextPart, Role
from events.event import Event
from src.google.adk.a2a.converters.event_converter import convert_a2a_task_to_event


def tags(*mark_names: str):
"""Simple marker decorator to attach tags to test methods."""
def _decorator(func: Callable):
setattr(func, "tags", set(mark_names))
return func
return _decorator


class Test_EventConverterConvertA2ATaskToEvent(unittest.TestCase):

@tags('negative', 'invalid', 'regression')
def test_convert_task_raises_value_error_on_none_task(self):
# Arrange
a2a_task = None

# Act & Assert
with self.assertRaisesRegex(ValueError, "A2A task cannot be None"):
convert_a2a_task_to_event(a2a_task)

@tags('positive', 'regression')
def test_artifacts_take_priority_over_status_and_history(self):
# Arrange
# Create artifacts where the last artifact contains "from-artifact-2"
artifact_part1 = TextPart("from-artifact-1")
artifact_part2 = TextPart("from-artifact-2")
artifact1 = types.SimpleNamespace(parts=[artifact_part1])
artifact2 = types.SimpleNamespace(parts=[artifact_part2])

status_message = Message(message_id="status-1", role=Role.agent, parts=[TextPart("from-status")])
status = types.SimpleNamespace(message=status_message)

history_last = Message(message_id="hist-2", role=Role.user, parts=[TextPart("from-history-last")])
history_first = Message(message_id="hist-1", role=Role.user, parts=[TextPart("from-history-first")])

task = types.SimpleNamespace(artifacts=[artifact1, artifact2], status=status, history=[history_first, history_last])

author = "custom-author"
invocation_context = types.SimpleNamespace(invocation_id="inv-123", branch="branch-A")

mocked_event = Event(invocation_id=invocation_context.invocation_id, author=author, branch=invocation_context.branch)

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', return_value=mocked_event) as mock_convert:
# Act
result = convert_a2a_task_to_event(task, author=author, invocation_context=invocation_context)

# Assert
mock_convert.assert_called_once()
passed_message = mock_convert.call_args[0][0]
passed_author = mock_convert.call_args[0][1]
passed_invocation_ctx = mock_convert.call_args[0][2]

self.assertIsInstance(passed_message, Message)
self.assertEqual(passed_message.role, Role.agent)
self.assertEqual(passed_message.parts, [artifact_part2])
self.assertEqual(passed_author, author)
self.assertIs(passed_invocation_ctx, invocation_context)

self.assertIs(result, mocked_event)

@tags('positive', 'smoke')
def test_uses_status_message_when_parts_present(self):
# Arrange
status_message = Message(message_id="status-1", role=Role.agent, parts=[TextPart("from-status")])
status = types.SimpleNamespace(message=status_message)

task = types.SimpleNamespace(artifacts=[], status=status, history=[])

def side_effect(a2a_message: Message, author: Optional[str], invocation_context: Optional[types.SimpleNamespace], part_converter=None):
# Simulate defaulting author to "a2a agent" when None
return Event(invocation_id="auto-id", author=author or "a2a agent", branch=invocation_context.branch if invocation_context else None)

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', side_effect=side_effect) as mock_convert:
# Act
result = convert_a2a_task_to_event(task, author=None, invocation_context=None)

# Assert
mock_convert.assert_called_once()
passed_message = mock_convert.call_args[0][0]
self.assertIs(passed_message, status_message)
self.assertEqual(result.author, "a2a agent")

@tags('positive', 'regression')
def test_falls_back_to_history_if_status_message_has_no_parts(self):
# Arrange
empty_status_message = Message(message_id="status-1", role=Role.agent, parts=[])
status = types.SimpleNamespace(message=empty_status_message)

history_last = Message(message_id="hist-2", role=Role.user, parts=[TextPart("from-history")])
history_first = Message(message_id="hist-1", role=Role.user, parts=[TextPart("from-history-old")])

task = types.SimpleNamespace(artifacts=[], status=status, history=[history_first, history_last])

mocked_event = Event(invocation_id="inv", author="a2a agent", branch=None)

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', return_value=mocked_event) as mock_convert:
# Act
result = convert_a2a_task_to_event(task)

# Assert
mock_convert.assert_called_once()
passed_message = mock_convert.call_args[0][0]
self.assertIs(passed_message, history_last)
self.assertIs(result, mocked_event)

@tags('positive', 'regression')
def test_uses_last_history_message_when_others_unavailable(self):
# Arrange
hist1 = Message(message_id="h1", role=Role.user, parts=[TextPart("first")])
hist2 = Message(message_id="h2", role=Role.agent, parts=[TextPart("last")])

task = types.SimpleNamespace(artifacts=[], status=None, history=[hist1, hist2])

mocked_event = Event(invocation_id="e-123", author="who", branch=None)

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', return_value=mocked_event) as mock_convert:
# Act
out = convert_a2a_task_to_event(task)

# Assert
mock_convert.assert_called_once()
passed_message = mock_convert.call_args[0][0]
self.assertIs(passed_message, hist2)
self.assertIs(out, mocked_event)

@tags('positive', 'regression')
def test_artifacts_with_empty_parts_still_convert_to_empty_content_event(self):
# Arrange
artifact_empty = types.SimpleNamespace(parts=[])
task = types.SimpleNamespace(artifacts=[artifact_empty], status=None, history=None)

# Prepare converter to return event with empty content
event_with_empty_content = Event(invocation_id="id-1", author="a2a agent", branch=None)
event_with_empty_content.content = types.SimpleNamespace(parts=[])

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', return_value=event_with_empty_content) as mock_convert:
# Act
result = convert_a2a_task_to_event(task)

# Assert
mock_convert.assert_called_once()
passed_message = mock_convert.call_args[0][0]
self.assertIsInstance(passed_message, Message)
self.assertEqual(passed_message.role, Role.agent)
self.assertEqual(passed_message.parts, [])
self.assertIs(result, event_with_empty_content)
self.assertTrue(hasattr(result, 'content'))
self.assertEqual(getattr(result.content, 'parts', None), [])

@tags('positive', 'smoke')
def test_minimal_event_created_when_no_message_available(self):
# Arrange
task = types.SimpleNamespace(artifacts=[], status=None, history=[])
fixed_uuid = uuid.UUID("00000000-0000-0000-0000-000000000001")

with patch('src.google.adk.a2a.converters.event_converter.uuid.uuid4', return_value=fixed_uuid):
# Act
result = convert_a2a_task_to_event(task, author=None, invocation_context=None)

# Assert
self.assertEqual(result.invocation_id, str(fixed_uuid))
self.assertEqual(result.author, "a2a agent")
self.assertIsNone(result.branch)

@tags('positive', 'regression')
def test_minimal_event_uses_context_and_custom_author(self):
# Arrange
task = types.SimpleNamespace(artifacts=[], status=None, history=[])
ctx = types.SimpleNamespace(invocation_id="inv-ctx-001", branch="branch-B")
author = "custom-author"

# Act
result = convert_a2a_task_to_event(task, author=author, invocation_context=ctx)

# Assert
self.assertEqual(result.invocation_id, "inv-ctx-001")
self.assertEqual(result.branch, "branch-B")
self.assertEqual(result.author, "custom-author")

@tags('negative', 'error-handling', 'regression')
def test_converter_exception_wrapped_as_runtime_error(self):
# Arrange
history_message = Message(message_id="h1", role=Role.user, parts=[TextPart("data")])
task = types.SimpleNamespace(artifacts=[], status=None, history=[history_message])

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', side_effect=ValueError("conversion-failed")):
# Act & Assert
with self.assertRaisesRegex(RuntimeError, r"^Failed to convert task message: conversion-failed"):
convert_a2a_task_to_event(task)

@tags('positive', 'smoke')
def test_returns_converter_event_unmodified(self):
# Arrange
history_message = Message(message_id="h-last", role=Role.user, parts=[TextPart("go")])
task = types.SimpleNamespace(artifacts=[], status=None, history=[history_message])

event_from_converter = Event(invocation_id="ret-1", author="author-1", branch="b1")
# Optionally enrich event to ensure it passes through unchanged
event_from_converter.long_running_tool_ids = {"tool-1"}
event_from_converter.content = types.SimpleNamespace(parts=[types.SimpleNamespace(kind="text", text="payload")])

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', return_value=event_from_converter) as mock_convert:
# Act
result = convert_a2a_task_to_event(task)

# Assert
mock_convert.assert_called_once()
self.assertIs(result, event_from_converter)
self.assertEqual(result.invocation_id, "ret-1")
self.assertEqual(result.author, "author-1")
self.assertEqual(result.branch, "b1")
self.assertEqual(result.long_running_tool_ids, {"tool-1"})
self.assertEqual(result.content.parts[0].text, "payload")

@tags('positive', 'extensibility', 'regression')
def test_custom_part_converter_is_forwarded(self):
# Arrange
history_message = Message(message_id="h1", role=Role.user, parts=[TextPart("x")])
task = types.SimpleNamespace(artifacts=[], status=None, history=[history_message])

def custom_converter(part):
# TODO: Replace with a real converter when integrating
return []

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', return_value=Event(invocation_id="i", author="a", branch=None)) as mock_convert:
# Act
_ = convert_a2a_task_to_event(task, part_converter=custom_converter)

# Assert
self.assertTrue(mock_convert.called)
# Ensure the custom converter object is forwarded unchanged
forwarded_converter = mock_convert.call_args.kwargs.get('part_converter', None)
self.assertIs(forwarded_converter, custom_converter)

@tags('positive', 'context', 'regression')
def test_invocation_context_forwarded_in_conversion_path(self):
# Arrange
status_message = Message(message_id="s1", role=Role.agent, parts=[TextPart("ctx")])
status = types.SimpleNamespace(message=status_message)
task = types.SimpleNamespace(artifacts=[], status=status, history=None)

ctx = types.SimpleNamespace(invocation_id="inv-X", branch="branch-X")

def side_effect(a2a_message: Message, author: Optional[str], invocation_context: Optional[types.SimpleNamespace], part_converter=None):
# Assert inside mock: verify the invocation context is received
self.assertIs(invocation_context, ctx)
return Event(invocation_id=invocation_context.invocation_id, author=author or "a2a agent", branch=invocation_context.branch)

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', side_effect=side_effect) as mock_convert:
# Act
result = convert_a2a_task_to_event(task, invocation_context=ctx)

# Assert
mock_convert.assert_called_once()
self.assertEqual(result.invocation_id, "inv-X")
self.assertEqual(result.branch, "branch-X")

@tags('positive', 'author', 'regression')
def test_author_default_and_custom_in_conversion_path(self):
# Arrange
history_message = Message(message_id="h1", role=Role.user, parts=[TextPart("message")])
task = types.SimpleNamespace(artifacts=[], status=None, history=[history_message])

def side_effect(a2a_message: Message, author: Optional[str], invocation_context: Optional[types.SimpleNamespace], part_converter=None):
return Event(invocation_id="auto", author=author or "a2a agent", branch=None)

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', side_effect=side_effect):
# Subcase A: author=None -> default to "a2a agent"
result_default = convert_a2a_task_to_event(task, author=None)
self.assertEqual(result_default.author, "a2a agent")

# Subcase B: author explicitly provided
result_custom = convert_a2a_task_to_event(task, author="explicit-author")
self.assertEqual(result_custom.author, "explicit-author")

@tags('positive', 'regression')
def test_history_with_empty_parts_leads_to_empty_content_event(self):
# Arrange
hist_empty = Message(message_id="h-empty", role=Role.user, parts=[])
task = types.SimpleNamespace(artifacts=[], status=None, history=[hist_empty])

event_with_empty_content = Event(invocation_id="i-2", author="a2a agent", branch=None)
event_with_empty_content.content = types.SimpleNamespace(parts=[])

with patch('src.google.adk.a2a.converters.event_converter.convert_a2a_message_to_event', return_value=event_with_empty_content) as mock_convert:
# Act
result = convert_a2a_task_to_event(task)

# Assert
mock_convert.assert_called_once()
passed_message = mock_convert.call_args[0][0]
self.assertIs(passed_message, hist_empty)
self.assertEqual(getattr(result.content, 'parts', None), [])


if __name__ == '__main__':
unittest.main()
14 changes: 14 additions & 0 deletions src/google/adk/a2a/requirements-roost.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

agents
artifacts
auth
events
flows
memory
protobuf
pydantic
runners
starlette
tools
typing_extensions
pytest