Add TwelveLabs video RAG template (Pegasus parser + Marengo embedder)#129
Conversation
|
Thank you for your interest in Pathway! Could you please sign the CLA, which is required for the merge? Thank you. |
|
Thanks @zxqfd555 — glad it's of interest! I'll get the CLA signed so we can move ahead with the merge. Appreciate you taking a look at the template. |
zxqfd555
left a comment
There was a problem hiding this comment.
Hi!
I have tested the integration with an actual API key, and I can confirm it works. I have several questions on the implementation. Could you please address them?
|
|
||
| def _upload_asset(self, contents: bytes) -> str: | ||
| """Upload video bytes and return the asset id once it is ready.""" | ||
| asset = self.client.assets.create( |
There was a problem hiding this comment.
I have tested the integration with a real API key, and it works!
However, I have noted that a run creates a new asset, which is not removed afterwards. I suspect it may flood the assets list if there are many runs. Would it be possible to clear the produced assets on completion?
| return len(self._embed_one(".")) | ||
|
|
||
| def _embed_one(self, text: str) -> np.ndarray: | ||
| response = self.client.embed.create(model_name=self.model, text=text) |
There was a problem hiding this comment.
If I understand correctly, this is a blocking call.
| Returns: | ||
| A list of 512-dimensional ``numpy`` arrays, one per input string. | ||
| """ | ||
| return [self._embed_one(text) for text in inputs] |
There was a problem hiding this comment.
If _embed_one is synchronous, this call is potentially inefficient.
It would be better to replace it with an async version if possible. If it's not provided by the current API, please document this behavior.
…t path - TwelveLabsVideoParser: delete the per-run uploaded asset after analysis via client.assets.delete() inside a try/finally (cleaned up even if analyze() raises). Add a `delete_assets: bool = True` flag so the asset list isn't flooded by default; pass False to keep assets for reuse/inspection. When deletion is on, omit `twelvelabs_asset_id` from metadata since the id no longer resolves. Docstrings updated accordingly. - MarengoEmbedder.get_embedding_dimension: document that it is a one-time, setup-time synchronous probe (Pathway calls it once when building the index), not on the hot path, so the single blocking call there is intentional. - MarengoEmbedder: move the embedding hot path to the async client. Add a lazily built AsyncTwelveLabs `aclient` property and `_aembed_one`, and make __wrapped__ run requests concurrently via asyncio.gather instead of calling the sync client serially. Keep sync `_embed_one` for the setup probe only. - Tests: cover asset deletion (default + disabled + delete-on-analyze-failure) and the new async, concurrent __wrapped__ path.
|
Thanks so much for testing this with a real key, @zxqfd555 — really appreciate it. I've pushed 1. Assets flooding the list ( 2. 3. I also added tests for the asset-deletion behavior (default, disabled, and cleanup-on-analyze-failure) and for the new concurrent async |
zxqfd555
left a comment
There was a problem hiding this comment.
Thank you for the changes. Merging.
|
Thank you @zxqfd555 and the Pathway team for the thorough review and the merge! 🎉 Really appreciate you testing against the live API and the sharp catches on asset cleanup and the async embedding path — the template's better for it. Excited to see TwelveLabs video RAG in Pathway; happy to help with any follow-ups down the line. |
|
As a follow-up, I think we can port the embedder to the Pathway package so it's available out of the box for anyone who pip-installs it. I've described the details in the linked issue. |
Hi! I'm Mohit, I work at TwelveLabs (@mohit-twelvelabs).
Introduction
This adds a new, fully opt-in application template: Video RAG with TwelveLabs (
templates/video_rag_twelvelabs/). It lets a Pathway pipeline do RAG over video by bringing in two TwelveLabs models:TwelveLabsVideoParser, apw.UDF) that uploads each video as a TwelveLabs asset and turns it into a rich text description (what happens on screen, who/what appears, spoken and on-screen text, the overall topic). Pathway then indexes that text exactly like it indexes a PDF.MarengoEmbedder, aBaseEmbeddersubclass) used as the retriever embedder.Both components live in a local
pathway_twelvelabspackage and are wired in entirely throughapp.yaml(mirroring themultimodal_ragandslides_ai_searchtemplates), so models, prompts, the data source, and the LLM can all be swapped without touching Python.Context
The existing templates handle documents (PDF/DOCX/slides) but not video. Video is hard to drop into RAG because most stacks only transcribe the audio and discard everything visual. Pegasus captures the whole video as text, and Marengo gives a shared multimodal embedding space. This extends Pathway's live-sync + in-memory-index story to a new modality with zero new infrastructure.
How has this been tested?
templates/video_rag_twelvelabs/test_twelvelabs.py: 4 no-network unit tests (stubbed SDK; run without credentials) covering the embedder vector shape, the Pegasus upload-then-analyze flow, failed-asset handling, and an embedding-dimension regression test. 2 of these are dimension/default checks; a 5th test is a live smoke test that's skipped unlessTWELVELABS_API_KEYis set.MarengoEmbedder.get_embedding_dimension()correctly reports 512 (this required overriding the base probe, which assumes a single-vector return).black,isort --profile black,flake8) pass on all new files. The new module type-checks cleanly undermypy; the template dir is added to the existing[tool.mypy] excludelist, consistent with the other RAG templates.Types of changes
This is purely additive: a new template directory plus one row in the main README table and one entry in the mypy exclude list. No existing template, default, or behavior is changed.
Related issue(s):
Checklist:
You can grab a free API key at https://twelvelabs.io — there's a generous free tier.