Python client and server SDK for Inferential inference orchestration. The Python package includes the client SDK (for sending observations and receiving results) and the server (scheduling and dispatch to local models or Ray Serve).
# Client + edge server (pyzmq, protobuf, numpy, pydantic — no Ray)
pip install inferential
# With Ray Serve for distributed serving
pip install inferential[ray]
# Development
pip install inferential[dev]See the full Quick Start guide for step-by-step setup.
import asyncio
import numpy as np
from inferential import Server, LocalDispatcher
def my_policy(obs: dict) -> dict:
dim = 7
for v in obs.values():
if isinstance(v, np.ndarray) and v.ndim == 1:
dim = v.shape[0]
break
return {"actions": np.random.randn(dim).astype(np.float32)}
dispatcher = LocalDispatcher({"policy-v2": my_policy})
server = Server(bind="tcp://*:5555", dispatcher=dispatcher)
@server.on_metric
def log(name, value, labels):
if name == "inference_latency_ms":
print(f"Client {labels.get('client')}: {value:.1f}ms")
asyncio.run(server.run())import asyncio
import numpy as np
from ray import serve
from inferential import Server
@serve.deployment
class MockPolicy:
def infer(self, obs: dict) -> dict:
dim = 7
for v in obs.values():
if isinstance(v, np.ndarray) and v.ndim == 1:
dim = v.shape[0]
break
return {"actions": np.random.randn(dim).astype(np.float32)}
serve.run(MockPolicy.bind(), name="policy-v2")
server = Server(bind="tcp://*:5555", models=["policy-v2"]) # defaults to RayDispatcher
asyncio.run(server.run())import numpy as np
from inferential import Connection
conn = Connection(server="tcp://localhost:5555", client_id="agent-01", client_type="franka")
model = conn.model("policy-v2", latency_budget_ms=30.0)
state = np.random.randn(7).astype(np.float32)
model.observe(urgency=0.8, state=state)
result = model.get_result(timeout_ms=50)
if result is not None:
actions = result["actions"] # np.ndarray
conn.close()import asyncio
import numpy as np
from inferential import AsyncConnection
async def main():
async with AsyncConnection(server="tcp://localhost:5555", client_id="agent-01") as conn:
model = conn.model("policy-v2", latency_budget_ms=30.0)
state = np.random.randn(7).astype(np.float32)
await model.observe(urgency=0.8, state=state)
result = await model.get_result(timeout_ms=50)
if result is not None:
actions = result["actions"] # np.ndarray
asyncio.run(main())Creates a ZMQ DEALER connection to the server. The server address can be with or without the tcp:// prefix.
Async variant using zmq.asyncio.Context. Supports async with for automatic cleanup.
Creates a handle to a specific model on the server.
Sends an observation to the server. Keyword arguments are automatically dispatched:
np.ndarrayvalues → serialized as tensors (dtype/shape preserved)strvalues → passed as metadata key-value pairsurgency(float, 0.0–1.0) → scheduling priority hintsteps_remaining(int) → remaining steps in trajectory
model.observe(
urgency=0.5,
steps_remaining=120,
state_vector=np.zeros(7, dtype=np.float32),
image=np.zeros((3, 224, 224), dtype=np.uint8),
prompt="describe the scene", # → metadata
)Waits for a response. Returns a dict mapping tensor keys to numpy arrays, or None on timeout. Also includes response_id, model_id, inference_latency_ms, and any metadata from the server.
Closes the ZMQ socket. Called automatically by AsyncConnection.__aexit__.
See Architecture for full details on schedulers, queue management, metrics, and configuration schema.
- Quick Start — Install, run server + client, get your first result
- Architecture — System design, wire protocol, schedulers, metrics
- Examples — Multi-language client demos, server extensions
- Contributing — Commit conventions, branching, code style