Skip to content

bidi_stream_query documented but not implemented in AdkApp.register_operations() #5611

@ayushworks

Description

@ayushworks

Summary

The Google Cloud documentation references bidi_stream_query as a supported streaming method for deployed ADK agents on Vertex AI Agent Runtime, but the method does not exist in any public version of the ADK SDK.

Expected behavior

bidi_stream_query should be available as a WebSocket-based bidirectional streaming method on a deployed AdkApp, as described in the official discussion post:
https://discuss.google.dev/t/deploy-bidirectional-streaming-agents-with-vertex-ai-agent-engine-and-live-api/266087

Actual behavior

AdkApp.register_operations() returns the following — no bidi_stream_query:

{
    "":             ["get_session", "list_sessions", "create_session", "delete_session"],
    "async":        ["async_get_session", "async_list_sessions", "async_create_session", "async_delete_session", "async_add_session_to_memory", "async_search_memory"],
    "stream":       ["stream_query"],
    "async_stream": ["async_stream_query", "streaming_agent_run_with_events"],
}

Attempting to invoke bidi_stream_query on a deployed engine via client.aio.live.agent_engines.connect() returns:

bidi_stream_query not found. Available methods are: []

Versions tested

  • google-adk==1.32.0bidi_stream_query absent from register_operations()
  • google-adk==2.0.0b1 (beta) — same result, identical register_operations() output

Verified locally by importing AdkApp and calling register_operations() directly:

from vertexai.agent_engines.templates.adk import AdkApp
print(AdkApp.register_operations(None))
# Returns the dict above — no bidi_stream_query

Workaround

Ended up using streaming_agent_run_with_events (registered under async_stream), which works but:

  • Takes a single request_json: str parameter (a JSON-serialised _StreamRunRequest) rather than a LiveRequest object
  • Yields _StreamingRunResponse dicts rather than ADK Event objects directly
  • Each yielded chunk is nested as chunk["output"]["events"]

Example working call:

request_json = json.dumps({
    "message": {"role": "user", "parts": [{"text": message}]},
    "user_id": customer_id,
    "session_id": session_id,
})
async for chunk in engine.streaming_agent_run_with_events(request_json=request_json):
    events = chunk.get("output", chunk).get("events", [])
    for event in events:
        if event.get("author") == "my_agent":
            for part in event.get("content", {}).get("parts", []):
                print(part.get("text", ""))

Metadata

Metadata

Assignees

Labels

live[Component] This issue is related to live, voice and video chat

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions