From ae50f8ee0208a34594126e1963f637c1433406af Mon Sep 17 00:00:00 2001 From: jensenojs Date: Tue, 7 Apr 2026 16:38:13 +0800 Subject: [PATCH 1/4] fix(renderer): guard malformed part render in flush --- lua/opencode/ui/renderer/flush.lua | 31 +- tests/data/todowrite-malformed-session.json | 551 ++++++++++++++++++ .../todowrite_malformed_session_spec.lua | 49 ++ 3 files changed, 629 insertions(+), 2 deletions(-) create mode 100644 tests/data/todowrite-malformed-session.json create mode 100644 tests/replay/todowrite_malformed_session_spec.lua diff --git a/lua/opencode/ui/renderer/flush.lua b/lua/opencode/ui/renderer/flush.lua index 40de9d4d..2db40c03 100644 --- a/lua/opencode/ui/renderer/flush.lua +++ b/lua/opencode/ui/renderer/flush.lua @@ -8,6 +8,29 @@ local buffer = require('opencode.ui.renderer.buffer') local append = require('opencode.ui.renderer.append') local M = {} +local warned_part_render_error = false + +---@param part_id string +---@param message_id string|nil +---@param err any +local function warn_part_render_error_once(part_id, message_id, err) + if warned_part_render_error then + return + end + warned_part_render_error = true + + local err_text = tostring(err):gsub('[\r\n]+', ' ') + local msg = string.format( + 'Skipped malformed part during render (part=%s message=%s). First error: %s', + tostring(part_id), + tostring(message_id), + err_text + ) + + vim.schedule(function() + vim.notify_once('[opencode.nvim] ' .. msg, vim.log.levels.WARN) + end) +end ---@generic T ---@param fn fun(): T @@ -294,11 +317,15 @@ local function format_part(part_id) end local is_last_part = (buffer.get_last_part_for_message(message) == part_id) - local formatted = formatter.format_part(rendered_part.part, message, is_last_part, function(session_id) + local ok, formatted_or_err = pcall(formatter.format_part, rendered_part.part, message, is_last_part, function(session_id) return ctx.render_state:get_child_session_parts(session_id) end) + if not ok then + warn_part_render_error_once(part_id, rendered_part.message_id, formatted_or_err) + return nil, rendered_part.message_id + end - return formatted, rendered_part.message_id + return formatted_or_err, rendered_part.message_id end ---@param message_id string diff --git a/tests/data/todowrite-malformed-session.json b/tests/data/todowrite-malformed-session.json new file mode 100644 index 00000000..6a893514 --- /dev/null +++ b/tests/data/todowrite-malformed-session.json @@ -0,0 +1,551 @@ +[ + { + "info": { + "parentID": "msg_d51155df10017sSiPpalfUA2GR", + "role": "assistant", + "mode": "orchestrator", + "agent": "orchestrator", + "path": { + "cwd": "/Users/oujinsai/Projects/cupel", + "root": "/Users/oujinsai/Projects/cupel" + }, + "cost": 0, + "tokens": { + "total": 56221, + "input": 4120, + "output": 225, + "reasoning": 0, + "cache": { + "write": 0, + "read": 51876 + } + }, + "modelID": "claude-sonnet-4.6", + "providerID": "github-copilot", + "time": { + "created": 1775181848176, + "completed": 1775181853644 + }, + "finish": "tool-calls", + "id": "msg_d51155e70001ph14LyziLptflQ", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S" + }, + "parts": [ + { + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-start", + "id": "prt_d51156b33001PB2zE2Q6hW0IXA", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d51155e70001ph14LyziLptflQ" + }, + { + "type": "text", + "text": "先把所有相关文件读一遍,再一次性处理。", + "time": { + "start": 1775181853584, + "end": 1775181853584 + }, + "id": "prt_d51156b37001UyqXq3A21XTnem", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d51155e70001ph14LyziLptflQ" + }, + { + "type": "tool", + "tool": "tilth_tilth_read", + "callID": "tooluse_fOt51IIn93uS7K9nURQ8nw", + "state": { + "status": "completed", + "input": { + "paths": [ + "market-ingest/src/fact/mod.rs", + "market-ingest/src/fact/raw_to_fact.rs", + "market-ingest/src/service.rs", + "market-ingest/src/signal/lineage_sidecar.rs", + "market-ingest/src/AGENTS.md" + ] + }, + "output": "# market-ingest/src/fact/mod.rs (14 lines, ~121 tokens) [full]\n\n//! fact 闭包:把来源差异在这里终结为 canonical fact(FactRow)。\n//!\n//! 这里只负责三件事:\n//! - 归一化与物理不变量校验(raw_to_fact/row_invariants)。\n//! - 组装批次与回放锚点,并写入 journal(batch_builder/journal_port)。\n\npub mod batch_builder;\npub mod journal_port;\npub mod raw_to_fact;\npub mod row_invariants;\n\npub use raw_to_fact::{assemble_normalized_batch, normalize_rows};\npub use row_invariants::NormalizeError;\n\n\n# market-ingest/src/fact/raw_to_fact.rs (108 lines, ~926 tokens) [full]\n\n//! fact raw_to_fact:把 adapter 输出收敛为 `FactRow`。\n//!\n//! 设计动机:\n//! - 这里做“事实形状收敛”,不做 worldview 语义解释。\n//! - key canonicalization 的 owner 在 adapter;normalize 不做字段重命名。\n//! - 这一层只做类型收敛,避免把数据源语义和世界观语义耦合在公共路径上。\n\nuse std::collections::BTreeMap;\n\nuse materia::identity::{CodeVersion, SourceFingerprint};\n\nuse crate::adapter::registry::{RawRow, RawValue};\nuse crate::model::{FactBatch, FactRow, PrimitiveValue, TimeViewId};\n\nuse super::batch_builder;\nuse super::row_invariants;\npub use super::row_invariants::NormalizeError;\n\npub fn normalize_rows(\n rows: Vec,\n) -> Result, NormalizeError> {\n let mut fact_rows = Vec::with_capacity(rows.len());\n\n for (index, row) in rows.into_iter().enumerate() {\n row_invariants::validate_row(index, &row)?;\n\n let mut fields = BTreeMap::new();\n for (key, raw_val) in row.fields {\n let pv = match raw_val {\n RawValue::F64(v) => PrimitiveValue::Float(v),\n RawValue::I64(v) => PrimitiveValue::Int(v),\n RawValue::Text(v) => PrimitiveValue::Text(v),\n RawValue::Bool(v) => PrimitiveValue::Bool(v),\n };\n\n fields.insert(key, pv);\n }\n\n fact_rows.push(FactRow {\n timestamp: row.timestamp_ms,\n fields,\n });\n }\n\n Ok(fact_rows)\n}\n\npub fn assemble_normalized_batch(\n fact_rows: Vec,\n symbol: &str,\n time_view_id: TimeViewId,\n source_fingerprint: SourceFingerprint,\n code_version: CodeVersion,\n) -> Result {\n batch_builder::assemble_normalized_batch(\n fact_rows,\n symbol,\n time_view_id,\n source_fingerprint,\n code_version,\n )\n}\n\n#[cfg(test)]\nmod tests {\n use std::collections::BTreeMap;\n\n use crate::adapter::registry::{InterfaceFamily, RawRow, RawValue};\n use crate::model::PrimitiveValue;\n\n use super::normalize_rows;\n\n #[test]\n fn normalize_maps_values_and_keeps_adapter_keys() {\n let mut fields = BTreeMap::new();\n fields.insert(\"bar:open\".to_string(), RawValue::F64(1.2));\n fields.insert(\"bar:high\".to_string(), RawValue::F64(1.3));\n fields.insert(\"bar:low\".to_string(), RawValue::F64(1.1));\n fields.insert(\"bar:close\".to_string(), RawValue::F64(1.25));\n fields.insert(\"bar:volume\".to_string(), RawValue::I64(10));\n fields.insert(\"source\".to_string(), RawValue::Text(\"okx\".to_string()));\n fields.insert(\"real\".to_string(), RawValue::Bool(true));\n\n let rows = vec![RawRow {\n adapter: \"okx\".to_string(),\n family: InterfaceFamily::Kline,\n symbol: \"BTC-USDT\".to_string(),\n timestamp_ms: 1000,\n fields,\n }];\n\n let fact_rows = normalize_rows(rows).expect(\"normalize ok\");\n let fact = fact_rows.first().expect(\"one fact\");\n\n assert_eq!(fact.timestamp, 1000);\n assert_eq!(fact.fields.get(\"bar:open\"), Some(&PrimitiveValue::Float(1.2)));\n assert_eq!(fact.fields.get(\"bar:high\"), Some(&PrimitiveValue::Float(1.3)));\n assert_eq!(fact.fields.get(\"bar:low\"), Some(&PrimitiveValue::Float(1.1)));\n assert_eq!(fact.fields.get(\"bar:close\"), Some(&PrimitiveValue::Float(1.25)));\n assert_eq!(fact.fields.get(\"bar:volume\"), Some(&PrimitiveValue::Int(10)));\n assert_eq!(\n fact.fields.get(\"source\"),\n Some(&PrimitiveValue::Text(\"okx\".to_string()))\n );\n assert_eq!(fact.fields.get(\"real\"), Some(&PrimitiveValue::Bool(true)));\n }\n}\n\n\n# market-ingest/src/service.rs (664 lines, ~5.9k tokens) [full]\n\n//! Service 层——market-ingest 的唯一对外入口。\n//!\n//! 编排 adapter -> fact -> signal 三条执行链,并以 `ReplayContractRef`\n//! 作为 replay 主入参,避免隐式状态补全。\n//!\n//! 设计原则:\n//! - service 只做编排,不承载 adapter SDK 细节。\n//! - service 不解释 worldview 语义,只消费执行规格。\n//! - replay 只消费显式合同与显式事实,不依赖进程内隐式状态。\n\nuse std::fmt;\n\nuse crate::adapter::{self, InterfaceFamily, RawRow, RegistryError};\nuse crate::model::{\n BatchId, FactBatch, IngestReceipt, MaterializeReceipt, ReplayContract, ReplayContractRef,\n ReplayReceipt, SignalBatch, TimePeriod, TimeViewId, Timezone, WorldviewExecutionSpec,\n};\nuse crate::fact::{self, journal_port::JournalPort, NormalizeError};\nuse crate::signal::{self, ProjectionError};\nuse materia::identity::{CodeVersion, SourceFingerprint};\n\n#[derive(Debug)]\npub enum ServiceError {\n Adapter(RegistryError),\n Normalize(NormalizeError),\n Projection(ProjectionError),\n BatchNotFound(BatchId),\n ContractNotFound(ReplayContractRef),\n Journal(String),\n ReplayMismatch {\n field: String,\n expected: String,\n actual: String,\n },\n}\n\nimpl fmt::Display for ServiceError {\n fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {\n match self {\n Self::Adapter(e) => write!(f, \"adapter: {e}\"),\n Self::Normalize(e) => write!(f, \"normalize: {e}\"),\n Self::Projection(e) => write!(f, \"projection: {e}\"),\n Self::BatchNotFound(id) => write!(f, \"batch not found: {:?}\", id),\n Self::ContractNotFound(id) => write!(f, \"replay contract not found: {:?}\", id),\n Self::Journal(e) => write!(f, \"journal: {e}\"),\n Self::ReplayMismatch {\n field,\n expected,\n actual,\n } => write!(\n f,\n \"replay mismatch on '{field}': expected {expected}, got {actual}\"\n ),\n }\n }\n}\n\nimpl std::error::Error for ServiceError {}\n\nimpl From for ServiceError {\n fn from(value: RegistryError) -> Self {\n Self::Adapter(value)\n }\n}\n\nimpl From for ServiceError {\n fn from(value: NormalizeError) -> Self {\n Self::Normalize(value)\n }\n}\n\nimpl From for ServiceError {\n fn from(value: ProjectionError) -> Self {\n Self::Projection(value)\n }\n}\n\npub struct IngestService {\n // 设计动机:replay 不依赖进程隐式状态,所有可复算事实都通过 journal 显式持久。\n journal: J,\n}\n\nimpl IngestService {\n pub fn new() -> Self {\n Self {\n journal: fact::journal_port::MemoryJournal::default(),\n }\n }\n}\n\nimpl Default for IngestService {\n fn default() -> Self {\n Self::new()\n }\n}\n\nimpl IngestService {\n pub fn with_journal(journal: J) -> Self {\n Self { journal }\n }\n\n pub fn ingest_batch(\n &mut self,\n adapter: &str,\n family: InterfaceFamily,\n symbol: &str,\n period: TimePeriod,\n timezone: Timezone,\n time_view_id: TimeViewId,\n limit: usize,\n source_fingerprint: SourceFingerprint,\n code_version: CodeVersion,\n ) -> Result {\n // source_fingerprint 是跨执行可复算锚点。\n validate_source_fingerprint(&source_fingerprint)?;\n let period_str = period_to_provider_str(&period);\n let timezone_str = timezone.as_provider_str();\n let rows = adapter::fetch_raw_rows(adapter, family, symbol, period_str, timezone_str, limit)?;\n self.ingest_from_rows(\n rows,\n symbol,\n time_view_id,\n source_fingerprint,\n code_version,\n )\n }\n\n pub fn ingest_from_rows(\n &mut self,\n rows: Vec,\n symbol: &str,\n time_view_id: TimeViewId,\n source_fingerprint: SourceFingerprint,\n code_version: CodeVersion,\n ) -> Result {\n validate_source_fingerprint(&source_fingerprint)?;\n let fact_rows = fact::normalize_rows(rows)?;\n let batch = fact::assemble_normalized_batch(\n fact_rows,\n symbol,\n time_view_id,\n source_fingerprint,\n code_version,\n )?;\n self.journal\n .append_fact_batch(batch.clone())\n .map_err(ServiceError::Journal)?;\n let ingest_receipt = fact::batch_builder::build_ingest_receipt(&batch);\n self.journal\n .append_ingest(ingest_receipt.clone())\n .map_err(ServiceError::Journal)?;\n Ok(ingest_receipt)\n }\n\n pub fn materialize(\n &mut self,\n batch_id: &BatchId,\n execution_spec: &WorldviewExecutionSpec,\n code_version: CodeVersion,\n ) -> Result<(SignalBatch, MaterializeReceipt), ServiceError> {\n // 先拿 worldview 无关事实闭包,再做 worldview 绑定投影。\n // 注意:这里使用的是执行规约化规格(worldview_pin + signal_keys),\n // 不是完整 worldview 治理对象。\n // service 只负责编排统一解释内核(gate -> mapper),\n // 把“事实是否满足这份执行需求”跑通;\n // 不在本层解释 owner/dimensions 等治理语义,也不写 worldview 分支。\n let batch = self.get_batch(batch_id)?;\n let (signal_samples, _decision, _sidecar) = signal::project_facts(\n &batch.fact_rows,\n execution_spec,\n &batch.time_view_id,\n )?;\n\n // contract_ref 由稳定锚点确定性复现。\n let contract_ref = ReplayContractRef(format!(\n \"replay:{}:{}:{}\",\n batch.batch_id.0,\n execution_spec.worldview_pin.id.0,\n execution_spec.worldview_pin.version.0\n ));\n let execution_spec_fingerprint = execution_spec_fingerprint(execution_spec);\n let replay_contract = ReplayContract {\n contract_ref: contract_ref.clone(),\n replay_anchor: crate::model::ReplayAnchor {\n batch_id: batch.batch_id.clone(),\n source_fingerprint: batch.source_fingerprint.clone(),\n time_view_id: batch.time_view_id.clone(),\n code_version: code_version.clone(),\n fact_count: batch.fact_count,\n time_range: batch.time_range,\n lineage_ref: format!(\n \"lineage:{}:{}\",\n batch.batch_id.0, batch.source_fingerprint.0\n ),\n },\n worldview_pin: execution_spec.worldview_pin.clone(),\n execution_spec_fingerprint,\n };\n self.journal\n .append_replay_contract(replay_contract.clone())\n .map_err(ServiceError::Journal)?;\n\n // SignalBatch 只承载物化产物;事实体通过 batch_id 回查 FactBatch。\n let materialized = SignalBatch {\n batch_id: batch.batch_id.clone(),\n worldview_pin: execution_spec.worldview_pin.clone(),\n time_view_id: batch.time_view_id.clone(),\n source_fingerprint: batch.source_fingerprint.clone(),\n code_version,\n symbol: batch.symbol.clone(),\n signal_samples: signal_samples,\n replay_anchor: replay_contract.replay_anchor.clone(),\n };\n let receipt = MaterializeReceipt {\n batch_id: batch.batch_id.clone(),\n replay_contract_ref: contract_ref,\n };\n\n Ok((materialized, receipt))\n }\n\n pub fn replay(\n &self,\n contract_ref: &ReplayContractRef,\n execution_spec: &WorldviewExecutionSpec,\n ) -> Result<(SignalBatch, ReplayReceipt), ServiceError> {\n // replay 与 materialize 复用同一份执行规约化规格,\n // 保证复算的是同一执行面,而不是二次解释 worldview 语义。\n // 这保证了:同事实闭包 + 同执行规格 -> 同信号产物排序与内容。\n let replay_contract = self\n .journal\n .query_replay_contract(contract_ref)\n .ok_or_else(|| ServiceError::ContractNotFound(contract_ref.clone()))?;\n\n if replay_contract.replay_anchor.lineage_ref.trim().is_empty() {\n return Err(ServiceError::ReplayMismatch {\n field: \"lineage_ref\".to_string(),\n expected: \"non-empty\".to_string(),\n actual: \"empty\".to_string(),\n });\n }\n\n check_match(\n \"worldview_pin\",\n &format!(\"{:?}\", replay_contract.worldview_pin),\n &format!(\"{:?}\", execution_spec.worldview_pin),\n )?;\n check_match(\n \"execution_spec_fingerprint\",\n &replay_contract.execution_spec_fingerprint,\n &execution_spec_fingerprint(execution_spec),\n )?;\n\n let batch = self.get_batch(&replay_contract.replay_anchor.batch_id)?;\n check_match(\n \"source_fingerprint\",\n &format!(\"{:?}\", replay_contract.replay_anchor.source_fingerprint),\n &format!(\"{:?}\", batch.source_fingerprint),\n )?;\n check_match(\n \"time_view_id\",\n &replay_contract.replay_anchor.time_view_id,\n &batch.time_view_id,\n )?;\n\n let (signal_samples, _decision, _sidecar) = signal::project_facts(\n &batch.fact_rows,\n execution_spec,\n &batch.time_view_id,\n )?;\n\n let materialized = SignalBatch {\n batch_id: batch.batch_id.clone(),\n worldview_pin: execution_spec.worldview_pin.clone(),\n time_view_id: batch.time_view_id.clone(),\n source_fingerprint: batch.source_fingerprint.clone(),\n code_version: replay_contract.replay_anchor.code_version.clone(),\n symbol: batch.symbol.clone(),\n signal_samples: signal_samples,\n replay_anchor: replay_contract.replay_anchor.clone(),\n };\n\n let receipt = ReplayReceipt {\n batch_id: batch.batch_id.clone(),\n replay_contract_ref: contract_ref.clone(),\n };\n\n Ok((materialized, receipt))\n }\n\n pub fn get_batch(&self, batch_id: &BatchId) -> Result {\n self.journal\n .query_fact_batch(batch_id)\n .ok_or_else(|| ServiceError::BatchNotFound(batch_id.clone()))\n }\n}\n\nfn check_match(field: &str, expected: &str, actual: &str) -> Result<(), ServiceError> {\n if expected != actual {\n return Err(ServiceError::ReplayMismatch {\n field: field.to_owned(),\n expected: expected.to_owned(),\n actual: actual.to_owned(),\n });\n }\n Ok(())\n}\n\nfn execution_spec_fingerprint(spec: &WorldviewExecutionSpec) -> String {\n let mut entries = Vec::with_capacity(spec.signal_keys.len());\n for key_spec in &spec.signal_keys {\n let value_domain = format!(\"{:?}\", key_spec.value_domain);\n entries.push(format!(\n \"{}:{}|{}:{}|{}:{}\",\n key_spec.key.len(),\n key_spec.key,\n value_domain.len(),\n value_domain,\n key_spec.description.len(),\n key_spec.description\n ));\n }\n entries.sort_unstable();\n let mut canonical = format!(\"signal_keys_count={};\", entries.len());\n canonical.push_str(&entries.join(\";\"));\n format!(\"esf:v1:{:016x}\", fnv1a64(canonical.as_bytes()))\n}\n\nfn fnv1a64(input: &[u8]) -> u64 {\n let mut hash: u64 = 0xcbf29ce484222325;\n for byte in input {\n hash ^= u64::from(*byte);\n hash = hash.wrapping_mul(0x100000001b3);\n }\n hash\n}\n\nfn period_to_provider_str(period: &TimePeriod) -> &'static str {\n match period {\n TimePeriod::Min1 => \"1m\",\n TimePeriod::Min5 => \"5m\",\n TimePeriod::Min15 => \"15m\",\n TimePeriod::Min30 => \"30m\",\n TimePeriod::Hour1 => \"1h\",\n TimePeriod::Hour4 => \"4h\",\n TimePeriod::Daily => \"1d\",\n TimePeriod::Weekly => \"1w\",\n TimePeriod::Monthly => \"1M\",\n }\n}\n\nfn validate_source_fingerprint(fp: &SourceFingerprint) -> Result<(), ServiceError> {\n let s = fp.0.trim();\n if s.is_empty() {\n return Err(ServiceError::Journal(\n \"source_fingerprint must not be empty\".to_string(),\n ));\n }\n if s.contains('/')\n || s.contains('\\\\')\n || s.contains('#')\n || s.contains('\\n')\n || s.contains('\\t')\n {\n // 路径分隔符/注释符会破坏跨环境可复算性。\n return Err(ServiceError::Journal(\n \"source_fingerprint contains unstable path/comment characters\".to_string(),\n ));\n }\n Ok(())\n}\n\n#[cfg(test)]\nmod tests {\n use std::cell::RefCell;\n use std::collections::BTreeMap;\n use std::rc::Rc;\n\n use super::*;\n use crate::adapter::{RawRow, RawValue};\n use crate::model::{BatchId, FactBatch, IngestReceipt, ReplayContract, ReplayContractRef};\n use crate::fact::journal_port::{JournalPort, MemoryJournal};\n use materia::identity::{PinRef, Version, WorldviewId};\n use materia::worldview::{SignalKeySpec, ValueDomain};\n\n #[derive(Clone, Default)]\n struct SharedMemoryJournal(Rc>);\n\n impl JournalPort for SharedMemoryJournal {\n fn append_fact_batch(&mut self, batch: FactBatch) -> Result<(), String> {\n self.0.borrow_mut().append_fact_batch(batch)\n }\n\n fn append_ingest(&mut self, receipt: IngestReceipt) -> Result<(), String> {\n self.0.borrow_mut().append_ingest(receipt)\n }\n\n fn append_replay_contract(\n &mut self,\n contract: ReplayContract,\n ) -> Result {\n self.0.borrow_mut().append_replay_contract(contract)\n }\n\n fn query_fact_batch(&self, batch_id: &BatchId) -> Option {\n self.0.borrow().query_fact_batch(batch_id)\n }\n\n fn query_replay_contract(&self, contract_ref: &ReplayContractRef) -> Option {\n self.0.borrow().query_replay_contract(contract_ref)\n }\n\n fn query_ingest(&self, batch_id: &BatchId) -> Option {\n self.0.borrow().query_ingest(batch_id)\n }\n }\n\n fn live_execution_spec() -> WorldviewExecutionSpec {\n WorldviewExecutionSpec {\n worldview_pin: PinRef {\n id: WorldviewId(\"wv-live-okx\".to_string()),\n version: Version(1),\n },\n signal_keys: vec![\n SignalKeySpec {\n key: \"bar:open\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"open;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:high\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"high;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:low\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"low;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:close\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"close;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:volume\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"volume;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n ],\n }\n }\n\n fn local_rows() -> Vec {\n let mut row1_fields = BTreeMap::new();\n row1_fields.insert(\"bar:open\".to_string(), RawValue::F64(1.0));\n row1_fields.insert(\"bar:high\".to_string(), RawValue::F64(1.2));\n row1_fields.insert(\"bar:low\".to_string(), RawValue::F64(0.9));\n row1_fields.insert(\"bar:close\".to_string(), RawValue::F64(1.1));\n row1_fields.insert(\"bar:volume\".to_string(), RawValue::F64(100.0));\n\n let mut row2_fields = BTreeMap::new();\n row2_fields.insert(\"bar:open\".to_string(), RawValue::F64(1.1));\n row2_fields.insert(\"bar:high\".to_string(), RawValue::F64(1.3));\n row2_fields.insert(\"bar:low\".to_string(), RawValue::F64(1.0));\n row2_fields.insert(\"bar:close\".to_string(), RawValue::F64(1.2));\n row2_fields.insert(\"bar:volume\".to_string(), RawValue::F64(120.0));\n\n vec![\n RawRow {\n adapter: \"okx\".to_string(),\n family: InterfaceFamily::Kline,\n symbol: \"BTCUSDT\".to_string(),\n timestamp_ms: 1_700_000_000_000,\n fields: row1_fields,\n },\n RawRow {\n adapter: \"okx\".to_string(),\n family: InterfaceFamily::Kline,\n symbol: \"BTCUSDT\".to_string(),\n timestamp_ms: 1_700_000_060_000,\n fields: row2_fields,\n },\n ]\n }\n\n #[test]\n fn replay_rejects_execution_spec_fingerprint_mismatch() {\n let mut service = IngestService::new();\n let ingest_receipt = service\n .ingest_from_rows(\n local_rows(),\n \"BTCUSDT\",\n \"tv/1m\".to_string(),\n SourceFingerprint(\"okx-local-btcusdt\".to_string()),\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"local ingest should succeed\");\n\n let spec = live_execution_spec();\n let (_materialized, materialize_receipt) = service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"materialize should succeed\");\n\n let mut drifted_spec = spec.clone();\n drifted_spec.signal_keys[0].description =\n \"open;version_id=v2;catalog_ref=core-bars\".to_string();\n\n let err = service\n .replay(&materialize_receipt.replay_contract_ref, &drifted_spec)\n .expect_err(\"replay should reject execution spec drift\");\n match err {\n ServiceError::ReplayMismatch { field, .. } => {\n assert_eq!(field, \"execution_spec_fingerprint\");\n }\n other => panic!(\"expected ReplayMismatch, got {other:?}\"),\n }\n }\n\n #[test]\n fn replay_works_across_service_instances_via_journal_fact_batch() {\n let shared_journal = SharedMemoryJournal::default();\n let mut writer = IngestService::with_journal(shared_journal.clone());\n let ingest_receipt = writer\n .ingest_from_rows(\n local_rows(),\n \"BTCUSDT\",\n \"tv/1m\".to_string(),\n SourceFingerprint(\"okx-local-btcusdt\".to_string()),\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"local ingest should succeed\");\n let spec = live_execution_spec();\n let (_materialized, materialize_receipt) = writer\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"materialize should succeed\");\n\n let reader = IngestService::with_journal(shared_journal);\n let (replayed, replay_receipt) = reader\n .replay(&materialize_receipt.replay_contract_ref, &spec)\n .expect(\"replay should succeed across service instances\");\n\n assert_eq!(replayed.batch_id, ingest_receipt.batch_id);\n assert_eq!(\n replay_receipt.replay_contract_ref,\n materialize_receipt.replay_contract_ref\n );\n assert_eq!(replayed.signal_samples.len(), 10);\n }\n\n #[test]\n fn materialize_rejects_replay_contract_ref_conflict() {\n let mut service = IngestService::new();\n let ingest_receipt = service\n .ingest_from_rows(\n local_rows(),\n \"BTCUSDT\",\n \"tv/1m\".to_string(),\n SourceFingerprint(\"okx-local-btcusdt\".to_string()),\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"local ingest should succeed\");\n let spec = live_execution_spec();\n\n service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"first materialize should succeed\");\n\n let err = service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.2-local\".to_string()),\n )\n .expect_err(\"second materialize with changed replay anchor must fail\");\n\n match err {\n ServiceError::Journal(msg) => {\n assert!(msg.contains(\"replay contract ref conflict\"));\n }\n other => panic!(\"expected Journal conflict error, got {other:?}\"),\n }\n }\n\n #[test]\n fn ingest_batch_rejects_non_utc_timezone() {\n let mut service = IngestService::new();\n let err = service\n .ingest_batch(\n \"okx\",\n InterfaceFamily::Kline,\n \"BTCUSDT\",\n TimePeriod::Daily,\n Timezone::AsiaShanghai,\n \"tv/1d\".to_string(),\n 5,\n SourceFingerprint(\"okx-live-btcusdt\".to_string()),\n CodeVersion(\"v0.1-live\".to_string()),\n )\n .expect_err(\"non-UTC timezone should be rejected before adapter pull\");\n\n assert!(\n err.to_string().contains(\"unsupported timezone\"),\n \"unexpected error: {err:?}\"\n );\n }\n\n #[test]\n #[ignore = \"requires external network and OKX public API availability\"]\n fn live_okx_ingest_materialize_replay() {\n let mut service = IngestService::new();\n let ingest_receipt = service\n .ingest_batch(\n \"okx\",\n InterfaceFamily::Kline,\n \"BTCUSDT\",\n TimePeriod::Daily,\n Timezone::Utc,\n \"tv/1d\".to_string(),\n 5,\n SourceFingerprint(\"okx-live-btcusdt\".to_string()),\n CodeVersion(\"v0.1-live\".to_string()),\n )\n .expect(\"okx ingest should return bars\");\n\n let spec = live_execution_spec();\n let (materialized, materialize_receipt) = service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-live\".to_string()),\n )\n .expect(\"materialize should succeed\");\n\n assert!(!service.get_batch(&ingest_receipt.batch_id).unwrap().fact_rows.is_empty());\n assert_eq!(\n materialized.signal_samples.len(),\n service.get_batch(&ingest_receipt.batch_id).unwrap().fact_count * 5\n );\n\n let (replayed, replay_receipt) = service\n .replay(&materialize_receipt.replay_contract_ref, &spec)\n .expect(\"replay should succeed\");\n\n assert_eq!(replayed.signal_samples.len(), materialized.signal_samples.len());\n assert_eq!(\n replay_receipt.replay_contract_ref,\n materialize_receipt.replay_contract_ref\n );\n }\n}\n\n\n# market-ingest/src/signal/lineage_sidecar.rs (15 lines, ~95 tokens) [full]\n\nuse crate::model::WorldviewExecutionSpec;\n\n#[derive(Clone, Debug, PartialEq, Eq)]\npub struct LineageSidecar {\n pub worldview_version: u64,\n pub notes: Vec,\n}\n\npub fn build(spec: &WorldviewExecutionSpec) -> LineageSidecar {\n LineageSidecar {\n worldview_version: spec.worldview_pin.version.0,\n notes: vec![\"execution-sidecar\".to_string()],\n }\n}\n\n\n# market-ingest/src/AGENTS.md (82 lines, ~927 tokens) [full]\n\n# market-ingest/src 总体工作协议\n\n## 这个文件解决什么\n\n- 给 `src/` 提供统一入口说明。\n- 固定主链、边界和红线,避免跨目录语义漂移。\n- 细节实现去各子域 `AGENTS.md`,这里不重复展开。\n\n## 目录形状\n\n```text\nmarket-ingest/src\n├── adapter/ # 数据源差异收口;RawRow 输出;python host/registry/provider 实现\n│ ├── registry.rs # adapter + family 路由,来源字段校验\n│ ├── python_host.rs # Python 模块加载与调用\n│ └── python/ # okx/tickflow provider 代码与能力声明\n├── fact/ # 事实闭包:类型收敛 + 物理校验 + batch 组装 + journal 端口\n│ ├── raw_to_fact.rs # RawRow -> FactRow\n│ ├── row_invariants.rs # 时间戳/数值合法性校验\n│ ├── batch_builder.rs # BatchId + ReplayAnchor + IngestReceipt\n│ └── journal_port.rs # fact batch / ingest / replay contract 存取\n├── signal/ # 执行解释内核\n│ ├── spec_gate.rs # 规格充分性判定,缺 key 直接 reject\n│ ├── fact_to_signal.rs # FactRow -> SignalSample 映射与排序\n│ └── lineage_sidecar.rs # 执行侧证据\n├── model/ # 共享执行对象\n│ ├── refs.rs # BatchId/TimeViewId/TimePeriod/Timezone/ReplayAnchor\n│ ├── batch.rs # FactRow/FactBatch/SignalSample/SignalBatch/ExecutionSpec\n│ └── receipt.rs # ReplayContract + 各类 Receipt\n├── service.rs # ingest/materialize/replay 唯一编排入口\n└── lib.rs # 模块导出\n```\n\n## 模块职责\n\n- `adapter/`:来源差异收口,输出统一 `RawRow`。\n- `fact/`:`RawRow -> FactRow`,物理校验,组装 `FactBatch` 与 `ReplayAnchor`。\n- `signal/`:统一解释内核(`spec_gate + fact_to_signal`),输出 `SignalSample`。\n- `model/`:共享执行对象(refs/batch/receipt)。\n- `service.rs`:`ingest/materialize/replay` 唯一编排入口。\n\n## 主链调用\n\n- ingest\n - `service::ingest_batch | ingest_from_rows`\n - `adapter::fetch_raw_rows`\n - `fact::normalize_rows`\n - `fact::assemble_normalized_batch`\n - `journal.append_fact_batch + append_ingest`\n- materialize\n - `service::materialize`\n - `journal.query_fact_batch`\n - `signal::project_facts(spec_gate -> fact_to_signal)`\n - `build ReplayContract(execution_spec_fingerprint)`\n - `journal.append_replay_contract`\n - `SignalBatch(signal_samples)`\n- replay\n - `service::replay`\n - `journal.query_replay_contract`\n - 校验 `worldview_pin + execution_spec_fingerprint + source_fingerprint + time_view_id`\n - `signal::project_facts`\n - `ReplayReceipt`\n\n## 全局边界\n\n- `materia::Worldview` 是治理声明对象;`market-ingest` 只消费执行规格:`worldview_pin + signal_keys`。\n- 解释路径必须统一为 `gate + mapper`;禁止 worldview case-by-case 分支。\n- canonical key owner 在 adapter mapper;`fact/raw_to_fact` 不做二次 key 重命名。\n- 时间顺序 owner 在 adapter;fact/signal 不做第二套排序语义。\n\n## 非主链说明\n\n- `channel.rs` 当前不在主链调用图中。\n- 若接入主链,必须在本文件与对应子域文档同步更新调用路径。\n- 若不接入,保持隔离并避免被误用为隐式入口。\n\n## 红线\n\n- 不改治理语义,不把 worldview 解释逻辑拉进 ingest/adapter。\n- 不新增补偿层来掩盖边界错误;优先删除、合并、收紧边界。\n- replay 合同一致性锚必须保留并强校验:`execution_spec_fingerprint`。\n", + "metadata": { + "truncated": false + }, + "title": "", + "time": { + "start": 1775181853170, + "end": 1775181853197 + }, + "attachments": [] + }, + "id": "prt_d51156da4001bTf4JXkZBlwq4Z", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d51155e70001ph14LyziLptflQ" + }, + { + "type": "tool", + "tool": "tilth_tilth_read", + "callID": "tooluse_64Q3O67KiHWkxuFYVq6dYB", + "state": { + "status": "completed", + "input": { + "path": "alchemist/tests/e2e_phase0.rs", + "section": "370-395" + }, + "output": "# alchemist/tests/e2e_phase0.rs (20 lines, ~171 tokens) [section]\n\n370 let must_exist = [\n371 format!(\"{manifest_dir}/src/bin/alchemist.rs\"),\n372 format!(\"{manifest_dir}/src/cli/mod.rs\"),\n373 format!(\"{manifest_dir}/src/cli/worldview.rs\"),\n374 format!(\"{manifest_dir}/src/cli/render.rs\"),\n375 format!(\"{manifest_dir}/src/cli/interactive.rs\"),\n376 format!(\"{manifest_dir}/src/cli/confirm.rs\"),\n377 format!(\"{manifest_dir}/src/model/mod.rs\"),\n378 format!(\"{manifest_dir}/src/model/worldview.rs\"),\n379 format!(\"{manifest_dir}/src/error.rs\"),\n380 ];\n381 \n382 for file in must_exist {\n383 assert!(\n384 std::path::Path::new(&file).exists(),\n385 \"expected file does not exist: {file}\"\n386 );\n387 }\n388 }\n", + "metadata": { + "truncated": false + }, + "title": "", + "time": { + "start": 1775181853548, + "end": 1775181853551 + }, + "attachments": [] + }, + "id": "prt_d511571f9001IhBLUR3AIKEYbO", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d51155e70001ph14LyziLptflQ" + }, + { + "reason": "tool-calls", + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-finish", + "tokens": { + "total": 56221, + "input": 4120, + "output": 225, + "reasoning": 0, + "cache": { + "write": 0, + "read": 51876 + } + }, + "cost": 0, + "id": "prt_d51157391001XyWKSRTMiAE2uc", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d51155e70001ph14LyziLptflQ" + } + ] + }, + { + "info": { + "parentID": "msg_d51155df10017sSiPpalfUA2GR", + "role": "assistant", + "mode": "orchestrator", + "agent": "orchestrator", + "path": { + "cwd": "/Users/oujinsai/Projects/cupel", + "root": "/Users/oujinsai/Projects/cupel" + }, + "cost": 0, + "tokens": { + "total": 66781, + "input": 10705, + "output": 81, + "reasoning": 0, + "cache": { + "write": 0, + "read": 55995 + } + }, + "modelID": "claude-sonnet-4.6", + "providerID": "github-copilot", + "time": { + "created": 1775181853650, + "completed": 1775181858280 + }, + "finish": "tool-calls", + "id": "msg_d511573d2001KVLX6Z3Ih2704v", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S" + }, + "parts": [ + { + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-start", + "id": "prt_d51157ff2001rAmYvikn1EIcxZ", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511573d2001KVLX6Z3Ih2704v" + }, + { + "type": "text", + "text": "现在检查 alchemist 实际文件结构:", + "time": { + "start": 1775181858221, + "end": 1775181858221 + }, + "id": "prt_d51157ff8001hdgbyZ7lDR1XU1", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511573d2001KVLX6Z3Ih2704v" + }, + { + "type": "tool", + "tool": "tilth_tilth_files", + "callID": "tooluse_Zaww2beC0az8ciR4gO0pkM", + "state": { + "status": "completed", + "input": { + "pattern": "alchemist/src/**/*.rs" + }, + "output": "# Glob: \"alchemist/src/**/*.rs\" in . — 13 files\n alchemist/src/commands.rs (~4009 tokens)\n alchemist/src/cli/interactive.rs (~1696 tokens)\n alchemist/src/cli/confirm.rs (~19 tokens)\n alchemist/src/cli/mod.rs (~662 tokens)\n alchemist/src/cli/worldview.rs (~3399 tokens)\n alchemist/src/cli/completion.rs (~1380 tokens)\n alchemist/src/cli/render.rs (~924 tokens)\n alchemist/src/lib.rs (~70 tokens)\n alchemist/src/error.rs (~1030 tokens)\n alchemist/src/bin/alchemist.rs (~38 tokens)\n alchemist/src/registry.rs (~1625 tokens)\n alchemist/src/contract/mod.rs (~5 tokens)\n alchemist/src/contract/worldview.rs (~482 tokens)", + "metadata": { + "truncated": false + }, + "title": "", + "time": { + "start": 1775181858216, + "end": 1775181858230 + }, + "attachments": [] + }, + "id": "prt_d5115811e001V9dLt1bM051ID6", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511573d2001KVLX6Z3Ih2704v" + }, + { + "reason": "tool-calls", + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-finish", + "tokens": { + "total": 66781, + "input": 10705, + "output": 81, + "reasoning": 0, + "cache": { + "write": 0, + "read": 55995 + } + }, + "cost": 0, + "id": "prt_d511585b70014GaH1h3c12Q2R6", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511573d2001KVLX6Z3Ih2704v" + } + ] + }, + { + "info": { + "parentID": "msg_d51155df10017sSiPpalfUA2GR", + "role": "assistant", + "mode": "orchestrator", + "agent": "orchestrator", + "path": { + "cwd": "/Users/oujinsai/Projects/cupel", + "root": "/Users/oujinsai/Projects/cupel" + }, + "cost": 0, + "tokens": { + "total": 67420, + "input": 382, + "output": 339, + "reasoning": 0, + "cache": { + "write": 0, + "read": 66699 + } + }, + "modelID": "claude-sonnet-4.6", + "providerID": "github-copilot", + "time": { + "created": 1775181858286, + "completed": 1775181866725 + }, + "finish": "tool-calls", + "id": "msg_d511585ee001ICUVdlLM8kmKcx", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S" + }, + "parts": [ + { + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-start", + "id": "prt_d51159388001tPXL9Bs1hPIy8f", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511585ee001ICUVdlLM8kmKcx" + }, + { + "type": "text", + "text": "情况清楚了。一共 6 处,现在逐一处理:", + "time": { + "start": 1775181866634, + "end": 1775181866634 + }, + "id": "prt_d5115938e0019Ak3r337ZV20L5", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511585ee001ICUVdlLM8kmKcx" + }, + { + "type": "tool", + "tool": "todowrite", + "callID": "tooluse_wrjiv5B3PpuZYvFGXsQxaO", + "state": { + "status": "error", + "input": { + "todos": "[{\"content\": \"1. 修复 e2e_phase0.rs bd06 测试:把期望路径从 model/ 改回 contract/\", \"status\": \"pending\", \"priority\": \"high\"}, {\"content\": \"2. 删除 raw_to_fact.rs 中 assemble_normalized_batch 中转层,fact/mod.rs 改为从 batch_builder 直接 re-export\", \"status\": \"pending\", \"priority\": \"high\"}, {\"content\": \"3. fact/mod.rs 注释\"三件事\"改为\"两件事\"\", \"status\": \"pending\", \"priority\": \"medium\"}, {\"content\": \"4. service.rs 两处 signal_samples: signal_samples 简写\", \"status\": \"pending\", \"priority\": \"low\"}, {\"content\": \"5. lineage_sidecar.rs 补 module doc\", \"status\": \"pending\", \"priority\": \"medium\"}, {\"content\": \"6. AGENTS.md 删除 channel.rs 相关段落\", \"status\": \"pending\", \"priority\": \"low\"}]" + }, + "error": "The todowrite tool was called with invalid arguments: [\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"todos\"\n ],\n \"message\": \"Invalid input: expected array, received string\"\n }\n].\nPlease rewrite the input so it satisfies the expected schema.", + "time": { + "start": 1775181866618, + "end": 1775181866636 + } + }, + "id": "prt_d5115966d001TW3y5jipTIul6j", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511585ee001ICUVdlLM8kmKcx" + }, + { + "reason": "tool-calls", + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-finish", + "tokens": { + "total": 67420, + "input": 382, + "output": 339, + "reasoning": 0, + "cache": { + "write": 0, + "read": 66699 + } + }, + "cost": 0, + "id": "prt_d5115a68d001GLxKh5vc4Pz3FX", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d511585ee001ICUVdlLM8kmKcx" + } + ] + }, + { + "info": { + "parentID": "msg_d51155df10017sSiPpalfUA2GR", + "role": "assistant", + "mode": "orchestrator", + "agent": "orchestrator", + "path": { + "cwd": "/Users/oujinsai/Projects/cupel", + "root": "/Users/oujinsai/Projects/cupel" + }, + "cost": 0, + "tokens": { + "total": 67827, + "input": 451, + "output": 296, + "reasoning": 0, + "cache": { + "write": 0, + "read": 67080 + } + }, + "modelID": "claude-sonnet-4.6", + "providerID": "github-copilot", + "time": { + "created": 1775181866731, + "completed": 1775181871876 + }, + "finish": "tool-calls", + "id": "msg_d5115a6eb001NAswwO1s579HJR", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S" + }, + "parts": [ + { + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-start", + "id": "prt_d5115b4b60017pRA52jl286kee", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d5115a6eb001NAswwO1s579HJR" + }, + { + "type": "tool", + "tool": "todowrite", + "callID": "tooluse_It89r5POl6cB9MTcBCxSbl", + "state": { + "status": "completed", + "input": { + "todos": [ + { + "content": "修复 e2e_phase0.rs bd06 测试:把期望路径从 model/ 改回 contract/", + "status": "in_progress", + "priority": "high" + }, + { + "content": "删除 raw_to_fact.rs 中 assemble_normalized_batch 中转层,fact/mod.rs 直接从 batch_builder re-export", + "status": "pending", + "priority": "high" + }, + { + "content": "fact/mod.rs 注释三件事改为两件事", + "status": "pending", + "priority": "medium" + }, + { + "content": "service.rs 两处 signal_samples: signal_samples 简写", + "status": "pending", + "priority": "low" + }, + { + "content": "lineage_sidecar.rs 补 module doc", + "status": "pending", + "priority": "medium" + }, + { + "content": "AGENTS.md 删除 channel.rs 相关段落", + "status": "pending", + "priority": "low" + } + ] + }, + "output": "[\n {\n \"content\": \"修复 e2e_phase0.rs bd06 测试:把期望路径从 model/ 改回 contract/\",\n \"status\": \"in_progress\",\n \"priority\": \"high\"\n },\n {\n \"content\": \"删除 raw_to_fact.rs 中 assemble_normalized_batch 中转层,fact/mod.rs 直接从 batch_builder re-export\",\n \"status\": \"pending\",\n \"priority\": \"high\"\n },\n {\n \"content\": \"fact/mod.rs 注释三件事改为两件事\",\n \"status\": \"pending\",\n \"priority\": \"medium\"\n },\n {\n \"content\": \"service.rs 两处 signal_samples: signal_samples 简写\",\n \"status\": \"pending\",\n \"priority\": \"low\"\n },\n {\n \"content\": \"lineage_sidecar.rs 补 module doc\",\n \"status\": \"pending\",\n \"priority\": \"medium\"\n },\n {\n \"content\": \"AGENTS.md 删除 channel.rs 相关段落\",\n \"status\": \"pending\",\n \"priority\": \"low\"\n }\n]", + "metadata": { + "todos": [ + { + "content": "修复 e2e_phase0.rs bd06 测试:把期望路径从 model/ 改回 contract/", + "status": "in_progress", + "priority": "high" + }, + { + "content": "删除 raw_to_fact.rs 中 assemble_normalized_batch 中转层,fact/mod.rs 直接从 batch_builder re-export", + "status": "pending", + "priority": "high" + }, + { + "content": "fact/mod.rs 注释三件事改为两件事", + "status": "pending", + "priority": "medium" + }, + { + "content": "service.rs 两处 signal_samples: signal_samples 简写", + "status": "pending", + "priority": "low" + }, + { + "content": "lineage_sidecar.rs 补 module doc", + "status": "pending", + "priority": "medium" + }, + { + "content": "AGENTS.md 删除 channel.rs 相关段落", + "status": "pending", + "priority": "low" + } + ], + "truncated": false + }, + "title": "6 todos", + "time": { + "start": 1775181871807, + "end": 1775181871818 + } + }, + "id": "prt_d5115b4bb001ZJMP7Dv3g60v0d", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d5115a6eb001NAswwO1s579HJR" + }, + { + "reason": "tool-calls", + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-finish", + "tokens": { + "total": 67827, + "input": 451, + "output": 296, + "reasoning": 0, + "cache": { + "write": 0, + "read": 67080 + } + }, + "cost": 0, + "id": "prt_d5115bacb0016T6UnNmF4Id9db", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d5115a6eb001NAswwO1s579HJR" + } + ] + }, + { + "info": { + "parentID": "msg_d51155df10017sSiPpalfUA2GR", + "role": "assistant", + "mode": "orchestrator", + "agent": "orchestrator", + "path": { + "cwd": "/Users/oujinsai/Projects/cupel", + "root": "/Users/oujinsai/Projects/cupel" + }, + "cost": 0, + "tokens": { + "total": 68513, + "input": 787, + "output": 196, + "reasoning": 0, + "cache": { + "write": 0, + "read": 67530 + } + }, + "modelID": "claude-sonnet-4.6", + "providerID": "github-copilot", + "time": { + "created": 1775181871880, + "completed": 1775181877367 + }, + "finish": "tool-calls", + "id": "msg_d5115bb080017VWT8D3Jl1K6rd", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S" + }, + "parts": [ + { + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-start", + "id": "prt_d5115c7f3001y4gugOeg24R7Do", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d5115bb080017VWT8D3Jl1K6rd" + }, + { + "type": "text", + "text": "**1. 修复 bd06 测试路径**", + "time": { + "start": 1775181877160, + "end": 1775181877160 + }, + "id": "prt_d5115c7f5001E4QlkWgUXQ6jxR", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d5115bb080017VWT8D3Jl1K6rd" + }, + { + "type": "tool", + "tool": "edit", + "callID": "tooluse_kOQ1xVxth7h3R5ao2drnB8", + "state": { + "status": "error", + "input": { + "filePath": "/Users/oujinsai/Projects/cupel/alchemist/tests/e2e_phase0.rs", + "oldString": " format!(\"{manifest_dir}/src/model/mod.rs\"),\n format!(\"{manifest_dir}/src/model/worldview.rs\"),", + "newString": " format!(\"{manifest_dir}/src/contract/mod.rs\"),\n format!(\"{manifest_dir}/src/contract/worldview.rs\")," + }, + "error": "You must read file /Users/oujinsai/Projects/cupel/alchemist/tests/e2e_phase0.rs before overwriting it. Use the Read tool first", + "time": { + "start": 1775181877055, + "end": 1775181877164 + } + }, + "id": "prt_d5115c95d001YBCBOCOyGBdm5G", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d5115bb080017VWT8D3Jl1K6rd" + }, + { + "reason": "tool-calls", + "snapshot": "6d93682cd59dc26937a236a546761e2c47156e5d", + "type": "step-finish", + "tokens": { + "total": 68513, + "input": 787, + "output": 196, + "reasoning": 0, + "cache": { + "write": 0, + "read": 67530 + } + }, + "cost": 0, + "id": "prt_d5115cfae001cPzeTLUcg8y7Bi", + "sessionID": "ses_2b405d87affeP3uEW9YwSCRL0S", + "messageID": "msg_d5115bb080017VWT8D3Jl1K6rd" + } + ] + } +] \ No newline at end of file diff --git a/tests/replay/todowrite_malformed_session_spec.lua b/tests/replay/todowrite_malformed_session_spec.lua new file mode 100644 index 00000000..815c5239 --- /dev/null +++ b/tests/replay/todowrite_malformed_session_spec.lua @@ -0,0 +1,49 @@ +local helpers = require('tests.helpers') +local state = require('opencode.state') +local renderer = require('opencode.ui.renderer') +local flush = require('opencode.ui.renderer.flush') +local ctx = require('opencode.ui.renderer.ctx') +local output_window = require('opencode.ui.output_window') + +describe('replay malformed todowrite session fixture', function() + before_each(function() + helpers.replay_setup() + end) + + after_each(function() + if ctx.bulk_mode then + flush.end_bulk_mode() + end + end) + + it('renders real malformed todowrite payload without crashing', function() + local session_data = helpers.load_test_data('tests/data/todowrite-malformed-session.json') + + assert.is_true(type(session_data) == 'table' and #session_data > 0) + + local malformed_found = false + for _, msg in ipairs(session_data) do + for _, part in ipairs(msg.parts or {}) do + if part.tool == 'todowrite' then + local todos = part.state and part.state.input and part.state.input.todos + if type(todos) == 'string' then + malformed_found = true + end + end + end + end + assert.is_true(malformed_found) + + state.session.set_active({ id = session_data[1].info.sessionID }) + + local ok, err = pcall(function() + renderer._render_full_session_data(session_data) + end) + + assert.is_true(ok, tostring(err)) + assert.is_false(ctx.bulk_mode) + + local actual = helpers.capture_output(state.windows and state.windows.output_buf, output_window.namespace) + assert.is_true(#(actual.lines or {}) > 0) + end) +end) From 86ab539687d2cea0e3d5409ad4656e10ff346b23 Mon Sep 17 00:00:00 2001 From: Jensen Date: Tue, 7 Apr 2026 16:56:18 +0800 Subject: [PATCH 2/4] Update lua/opencode/ui/renderer/flush.lua Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lua/opencode/ui/renderer/flush.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lua/opencode/ui/renderer/flush.lua b/lua/opencode/ui/renderer/flush.lua index 2db40c03..d31d9fbd 100644 --- a/lua/opencode/ui/renderer/flush.lua +++ b/lua/opencode/ui/renderer/flush.lua @@ -28,7 +28,8 @@ local function warn_part_render_error_once(part_id, message_id, err) ) vim.schedule(function() - vim.notify_once('[opencode.nvim] ' .. msg, vim.log.levels.WARN) + local notify = vim.notify_once or vim.notify + notify('[opencode.nvim] ' .. msg, vim.log.levels.WARN) end) end From 04178b75d7142f3e4cdede3c520480e0d9179167 Mon Sep 17 00:00:00 2001 From: Jensen Date: Tue, 7 Apr 2026 16:56:29 +0800 Subject: [PATCH 3/4] Update tests/data/todowrite-malformed-session.json Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/data/todowrite-malformed-session.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/data/todowrite-malformed-session.json b/tests/data/todowrite-malformed-session.json index 6a893514..9b542260 100644 --- a/tests/data/todowrite-malformed-session.json +++ b/tests/data/todowrite-malformed-session.json @@ -6,8 +6,8 @@ "mode": "orchestrator", "agent": "orchestrator", "path": { - "cwd": "/Users/oujinsai/Projects/cupel", - "root": "/Users/oujinsai/Projects/cupel" + "cwd": "/mock/cupel", + "root": "/mock/cupel" }, "cost": 0, "tokens": { From 84e1635ca7dccd27509c16b45bf87812ca29c084 Mon Sep 17 00:00:00 2001 From: Jensen Date: Tue, 7 Apr 2026 16:56:51 +0800 Subject: [PATCH 4/4] Update tests/data/todowrite-malformed-session.json Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/data/todowrite-malformed-session.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/data/todowrite-malformed-session.json b/tests/data/todowrite-malformed-session.json index 9b542260..28599ae0 100644 --- a/tests/data/todowrite-malformed-session.json +++ b/tests/data/todowrite-malformed-session.json @@ -66,9 +66,9 @@ }, "output": "# market-ingest/src/fact/mod.rs (14 lines, ~121 tokens) [full]\n\n//! fact 闭包:把来源差异在这里终结为 canonical fact(FactRow)。\n//!\n//! 这里只负责三件事:\n//! - 归一化与物理不变量校验(raw_to_fact/row_invariants)。\n//! - 组装批次与回放锚点,并写入 journal(batch_builder/journal_port)。\n\npub mod batch_builder;\npub mod journal_port;\npub mod raw_to_fact;\npub mod row_invariants;\n\npub use raw_to_fact::{assemble_normalized_batch, normalize_rows};\npub use row_invariants::NormalizeError;\n\n\n# market-ingest/src/fact/raw_to_fact.rs (108 lines, ~926 tokens) [full]\n\n//! fact raw_to_fact:把 adapter 输出收敛为 `FactRow`。\n//!\n//! 设计动机:\n//! - 这里做“事实形状收敛”,不做 worldview 语义解释。\n//! - key canonicalization 的 owner 在 adapter;normalize 不做字段重命名。\n//! - 这一层只做类型收敛,避免把数据源语义和世界观语义耦合在公共路径上。\n\nuse std::collections::BTreeMap;\n\nuse materia::identity::{CodeVersion, SourceFingerprint};\n\nuse crate::adapter::registry::{RawRow, RawValue};\nuse crate::model::{FactBatch, FactRow, PrimitiveValue, TimeViewId};\n\nuse super::batch_builder;\nuse super::row_invariants;\npub use super::row_invariants::NormalizeError;\n\npub fn normalize_rows(\n rows: Vec,\n) -> Result, NormalizeError> {\n let mut fact_rows = Vec::with_capacity(rows.len());\n\n for (index, row) in rows.into_iter().enumerate() {\n row_invariants::validate_row(index, &row)?;\n\n let mut fields = BTreeMap::new();\n for (key, raw_val) in row.fields {\n let pv = match raw_val {\n RawValue::F64(v) => PrimitiveValue::Float(v),\n RawValue::I64(v) => PrimitiveValue::Int(v),\n RawValue::Text(v) => PrimitiveValue::Text(v),\n RawValue::Bool(v) => PrimitiveValue::Bool(v),\n };\n\n fields.insert(key, pv);\n }\n\n fact_rows.push(FactRow {\n timestamp: row.timestamp_ms,\n fields,\n });\n }\n\n Ok(fact_rows)\n}\n\npub fn assemble_normalized_batch(\n fact_rows: Vec,\n symbol: &str,\n time_view_id: TimeViewId,\n source_fingerprint: SourceFingerprint,\n code_version: CodeVersion,\n) -> Result {\n batch_builder::assemble_normalized_batch(\n fact_rows,\n symbol,\n time_view_id,\n source_fingerprint,\n code_version,\n )\n}\n\n#[cfg(test)]\nmod tests {\n use std::collections::BTreeMap;\n\n use crate::adapter::registry::{InterfaceFamily, RawRow, RawValue};\n use crate::model::PrimitiveValue;\n\n use super::normalize_rows;\n\n #[test]\n fn normalize_maps_values_and_keeps_adapter_keys() {\n let mut fields = BTreeMap::new();\n fields.insert(\"bar:open\".to_string(), RawValue::F64(1.2));\n fields.insert(\"bar:high\".to_string(), RawValue::F64(1.3));\n fields.insert(\"bar:low\".to_string(), RawValue::F64(1.1));\n fields.insert(\"bar:close\".to_string(), RawValue::F64(1.25));\n fields.insert(\"bar:volume\".to_string(), RawValue::I64(10));\n fields.insert(\"source\".to_string(), RawValue::Text(\"okx\".to_string()));\n fields.insert(\"real\".to_string(), RawValue::Bool(true));\n\n let rows = vec![RawRow {\n adapter: \"okx\".to_string(),\n family: InterfaceFamily::Kline,\n symbol: \"BTC-USDT\".to_string(),\n timestamp_ms: 1000,\n fields,\n }];\n\n let fact_rows = normalize_rows(rows).expect(\"normalize ok\");\n let fact = fact_rows.first().expect(\"one fact\");\n\n assert_eq!(fact.timestamp, 1000);\n assert_eq!(fact.fields.get(\"bar:open\"), Some(&PrimitiveValue::Float(1.2)));\n assert_eq!(fact.fields.get(\"bar:high\"), Some(&PrimitiveValue::Float(1.3)));\n assert_eq!(fact.fields.get(\"bar:low\"), Some(&PrimitiveValue::Float(1.1)));\n assert_eq!(fact.fields.get(\"bar:close\"), Some(&PrimitiveValue::Float(1.25)));\n assert_eq!(fact.fields.get(\"bar:volume\"), Some(&PrimitiveValue::Int(10)));\n assert_eq!(\n fact.fields.get(\"source\"),\n Some(&PrimitiveValue::Text(\"okx\".to_string()))\n );\n assert_eq!(fact.fields.get(\"real\"), Some(&PrimitiveValue::Bool(true)));\n }\n}\n\n\n# market-ingest/src/service.rs (664 lines, ~5.9k tokens) [full]\n\n//! Service 层——market-ingest 的唯一对外入口。\n//!\n//! 编排 adapter -> fact -> signal 三条执行链,并以 `ReplayContractRef`\n//! 作为 replay 主入参,避免隐式状态补全。\n//!\n//! 设计原则:\n//! - service 只做编排,不承载 adapter SDK 细节。\n//! - service 不解释 worldview 语义,只消费执行规格。\n//! - replay 只消费显式合同与显式事实,不依赖进程内隐式状态。\n\nuse std::fmt;\n\nuse crate::adapter::{self, InterfaceFamily, RawRow, RegistryError};\nuse crate::model::{\n BatchId, FactBatch, IngestReceipt, MaterializeReceipt, ReplayContract, ReplayContractRef,\n ReplayReceipt, SignalBatch, TimePeriod, TimeViewId, Timezone, WorldviewExecutionSpec,\n};\nuse crate::fact::{self, journal_port::JournalPort, NormalizeError};\nuse crate::signal::{self, ProjectionError};\nuse materia::identity::{CodeVersion, SourceFingerprint};\n\n#[derive(Debug)]\npub enum ServiceError {\n Adapter(RegistryError),\n Normalize(NormalizeError),\n Projection(ProjectionError),\n BatchNotFound(BatchId),\n ContractNotFound(ReplayContractRef),\n Journal(String),\n ReplayMismatch {\n field: String,\n expected: String,\n actual: String,\n },\n}\n\nimpl fmt::Display for ServiceError {\n fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {\n match self {\n Self::Adapter(e) => write!(f, \"adapter: {e}\"),\n Self::Normalize(e) => write!(f, \"normalize: {e}\"),\n Self::Projection(e) => write!(f, \"projection: {e}\"),\n Self::BatchNotFound(id) => write!(f, \"batch not found: {:?}\", id),\n Self::ContractNotFound(id) => write!(f, \"replay contract not found: {:?}\", id),\n Self::Journal(e) => write!(f, \"journal: {e}\"),\n Self::ReplayMismatch {\n field,\n expected,\n actual,\n } => write!(\n f,\n \"replay mismatch on '{field}': expected {expected}, got {actual}\"\n ),\n }\n }\n}\n\nimpl std::error::Error for ServiceError {}\n\nimpl From for ServiceError {\n fn from(value: RegistryError) -> Self {\n Self::Adapter(value)\n }\n}\n\nimpl From for ServiceError {\n fn from(value: NormalizeError) -> Self {\n Self::Normalize(value)\n }\n}\n\nimpl From for ServiceError {\n fn from(value: ProjectionError) -> Self {\n Self::Projection(value)\n }\n}\n\npub struct IngestService {\n // 设计动机:replay 不依赖进程隐式状态,所有可复算事实都通过 journal 显式持久。\n journal: J,\n}\n\nimpl IngestService {\n pub fn new() -> Self {\n Self {\n journal: fact::journal_port::MemoryJournal::default(),\n }\n }\n}\n\nimpl Default for IngestService {\n fn default() -> Self {\n Self::new()\n }\n}\n\nimpl IngestService {\n pub fn with_journal(journal: J) -> Self {\n Self { journal }\n }\n\n pub fn ingest_batch(\n &mut self,\n adapter: &str,\n family: InterfaceFamily,\n symbol: &str,\n period: TimePeriod,\n timezone: Timezone,\n time_view_id: TimeViewId,\n limit: usize,\n source_fingerprint: SourceFingerprint,\n code_version: CodeVersion,\n ) -> Result {\n // source_fingerprint 是跨执行可复算锚点。\n validate_source_fingerprint(&source_fingerprint)?;\n let period_str = period_to_provider_str(&period);\n let timezone_str = timezone.as_provider_str();\n let rows = adapter::fetch_raw_rows(adapter, family, symbol, period_str, timezone_str, limit)?;\n self.ingest_from_rows(\n rows,\n symbol,\n time_view_id,\n source_fingerprint,\n code_version,\n )\n }\n\n pub fn ingest_from_rows(\n &mut self,\n rows: Vec,\n symbol: &str,\n time_view_id: TimeViewId,\n source_fingerprint: SourceFingerprint,\n code_version: CodeVersion,\n ) -> Result {\n validate_source_fingerprint(&source_fingerprint)?;\n let fact_rows = fact::normalize_rows(rows)?;\n let batch = fact::assemble_normalized_batch(\n fact_rows,\n symbol,\n time_view_id,\n source_fingerprint,\n code_version,\n )?;\n self.journal\n .append_fact_batch(batch.clone())\n .map_err(ServiceError::Journal)?;\n let ingest_receipt = fact::batch_builder::build_ingest_receipt(&batch);\n self.journal\n .append_ingest(ingest_receipt.clone())\n .map_err(ServiceError::Journal)?;\n Ok(ingest_receipt)\n }\n\n pub fn materialize(\n &mut self,\n batch_id: &BatchId,\n execution_spec: &WorldviewExecutionSpec,\n code_version: CodeVersion,\n ) -> Result<(SignalBatch, MaterializeReceipt), ServiceError> {\n // 先拿 worldview 无关事实闭包,再做 worldview 绑定投影。\n // 注意:这里使用的是执行规约化规格(worldview_pin + signal_keys),\n // 不是完整 worldview 治理对象。\n // service 只负责编排统一解释内核(gate -> mapper),\n // 把“事实是否满足这份执行需求”跑通;\n // 不在本层解释 owner/dimensions 等治理语义,也不写 worldview 分支。\n let batch = self.get_batch(batch_id)?;\n let (signal_samples, _decision, _sidecar) = signal::project_facts(\n &batch.fact_rows,\n execution_spec,\n &batch.time_view_id,\n )?;\n\n // contract_ref 由稳定锚点确定性复现。\n let contract_ref = ReplayContractRef(format!(\n \"replay:{}:{}:{}\",\n batch.batch_id.0,\n execution_spec.worldview_pin.id.0,\n execution_spec.worldview_pin.version.0\n ));\n let execution_spec_fingerprint = execution_spec_fingerprint(execution_spec);\n let replay_contract = ReplayContract {\n contract_ref: contract_ref.clone(),\n replay_anchor: crate::model::ReplayAnchor {\n batch_id: batch.batch_id.clone(),\n source_fingerprint: batch.source_fingerprint.clone(),\n time_view_id: batch.time_view_id.clone(),\n code_version: code_version.clone(),\n fact_count: batch.fact_count,\n time_range: batch.time_range,\n lineage_ref: format!(\n \"lineage:{}:{}\",\n batch.batch_id.0, batch.source_fingerprint.0\n ),\n },\n worldview_pin: execution_spec.worldview_pin.clone(),\n execution_spec_fingerprint,\n };\n self.journal\n .append_replay_contract(replay_contract.clone())\n .map_err(ServiceError::Journal)?;\n\n // SignalBatch 只承载物化产物;事实体通过 batch_id 回查 FactBatch。\n let materialized = SignalBatch {\n batch_id: batch.batch_id.clone(),\n worldview_pin: execution_spec.worldview_pin.clone(),\n time_view_id: batch.time_view_id.clone(),\n source_fingerprint: batch.source_fingerprint.clone(),\n code_version,\n symbol: batch.symbol.clone(),\n signal_samples: signal_samples,\n replay_anchor: replay_contract.replay_anchor.clone(),\n };\n let receipt = MaterializeReceipt {\n batch_id: batch.batch_id.clone(),\n replay_contract_ref: contract_ref,\n };\n\n Ok((materialized, receipt))\n }\n\n pub fn replay(\n &self,\n contract_ref: &ReplayContractRef,\n execution_spec: &WorldviewExecutionSpec,\n ) -> Result<(SignalBatch, ReplayReceipt), ServiceError> {\n // replay 与 materialize 复用同一份执行规约化规格,\n // 保证复算的是同一执行面,而不是二次解释 worldview 语义。\n // 这保证了:同事实闭包 + 同执行规格 -> 同信号产物排序与内容。\n let replay_contract = self\n .journal\n .query_replay_contract(contract_ref)\n .ok_or_else(|| ServiceError::ContractNotFound(contract_ref.clone()))?;\n\n if replay_contract.replay_anchor.lineage_ref.trim().is_empty() {\n return Err(ServiceError::ReplayMismatch {\n field: \"lineage_ref\".to_string(),\n expected: \"non-empty\".to_string(),\n actual: \"empty\".to_string(),\n });\n }\n\n check_match(\n \"worldview_pin\",\n &format!(\"{:?}\", replay_contract.worldview_pin),\n &format!(\"{:?}\", execution_spec.worldview_pin),\n )?;\n check_match(\n \"execution_spec_fingerprint\",\n &replay_contract.execution_spec_fingerprint,\n &execution_spec_fingerprint(execution_spec),\n )?;\n\n let batch = self.get_batch(&replay_contract.replay_anchor.batch_id)?;\n check_match(\n \"source_fingerprint\",\n &format!(\"{:?}\", replay_contract.replay_anchor.source_fingerprint),\n &format!(\"{:?}\", batch.source_fingerprint),\n )?;\n check_match(\n \"time_view_id\",\n &replay_contract.replay_anchor.time_view_id,\n &batch.time_view_id,\n )?;\n\n let (signal_samples, _decision, _sidecar) = signal::project_facts(\n &batch.fact_rows,\n execution_spec,\n &batch.time_view_id,\n )?;\n\n let materialized = SignalBatch {\n batch_id: batch.batch_id.clone(),\n worldview_pin: execution_spec.worldview_pin.clone(),\n time_view_id: batch.time_view_id.clone(),\n source_fingerprint: batch.source_fingerprint.clone(),\n code_version: replay_contract.replay_anchor.code_version.clone(),\n symbol: batch.symbol.clone(),\n signal_samples: signal_samples,\n replay_anchor: replay_contract.replay_anchor.clone(),\n };\n\n let receipt = ReplayReceipt {\n batch_id: batch.batch_id.clone(),\n replay_contract_ref: contract_ref.clone(),\n };\n\n Ok((materialized, receipt))\n }\n\n pub fn get_batch(&self, batch_id: &BatchId) -> Result {\n self.journal\n .query_fact_batch(batch_id)\n .ok_or_else(|| ServiceError::BatchNotFound(batch_id.clone()))\n }\n}\n\nfn check_match(field: &str, expected: &str, actual: &str) -> Result<(), ServiceError> {\n if expected != actual {\n return Err(ServiceError::ReplayMismatch {\n field: field.to_owned(),\n expected: expected.to_owned(),\n actual: actual.to_owned(),\n });\n }\n Ok(())\n}\n\nfn execution_spec_fingerprint(spec: &WorldviewExecutionSpec) -> String {\n let mut entries = Vec::with_capacity(spec.signal_keys.len());\n for key_spec in &spec.signal_keys {\n let value_domain = format!(\"{:?}\", key_spec.value_domain);\n entries.push(format!(\n \"{}:{}|{}:{}|{}:{}\",\n key_spec.key.len(),\n key_spec.key,\n value_domain.len(),\n value_domain,\n key_spec.description.len(),\n key_spec.description\n ));\n }\n entries.sort_unstable();\n let mut canonical = format!(\"signal_keys_count={};\", entries.len());\n canonical.push_str(&entries.join(\";\"));\n format!(\"esf:v1:{:016x}\", fnv1a64(canonical.as_bytes()))\n}\n\nfn fnv1a64(input: &[u8]) -> u64 {\n let mut hash: u64 = 0xcbf29ce484222325;\n for byte in input {\n hash ^= u64::from(*byte);\n hash = hash.wrapping_mul(0x100000001b3);\n }\n hash\n}\n\nfn period_to_provider_str(period: &TimePeriod) -> &'static str {\n match period {\n TimePeriod::Min1 => \"1m\",\n TimePeriod::Min5 => \"5m\",\n TimePeriod::Min15 => \"15m\",\n TimePeriod::Min30 => \"30m\",\n TimePeriod::Hour1 => \"1h\",\n TimePeriod::Hour4 => \"4h\",\n TimePeriod::Daily => \"1d\",\n TimePeriod::Weekly => \"1w\",\n TimePeriod::Monthly => \"1M\",\n }\n}\n\nfn validate_source_fingerprint(fp: &SourceFingerprint) -> Result<(), ServiceError> {\n let s = fp.0.trim();\n if s.is_empty() {\n return Err(ServiceError::Journal(\n \"source_fingerprint must not be empty\".to_string(),\n ));\n }\n if s.contains('/')\n || s.contains('\\\\')\n || s.contains('#')\n || s.contains('\\n')\n || s.contains('\\t')\n {\n // 路径分隔符/注释符会破坏跨环境可复算性。\n return Err(ServiceError::Journal(\n \"source_fingerprint contains unstable path/comment characters\".to_string(),\n ));\n }\n Ok(())\n}\n\n#[cfg(test)]\nmod tests {\n use std::cell::RefCell;\n use std::collections::BTreeMap;\n use std::rc::Rc;\n\n use super::*;\n use crate::adapter::{RawRow, RawValue};\n use crate::model::{BatchId, FactBatch, IngestReceipt, ReplayContract, ReplayContractRef};\n use crate::fact::journal_port::{JournalPort, MemoryJournal};\n use materia::identity::{PinRef, Version, WorldviewId};\n use materia::worldview::{SignalKeySpec, ValueDomain};\n\n #[derive(Clone, Default)]\n struct SharedMemoryJournal(Rc>);\n\n impl JournalPort for SharedMemoryJournal {\n fn append_fact_batch(&mut self, batch: FactBatch) -> Result<(), String> {\n self.0.borrow_mut().append_fact_batch(batch)\n }\n\n fn append_ingest(&mut self, receipt: IngestReceipt) -> Result<(), String> {\n self.0.borrow_mut().append_ingest(receipt)\n }\n\n fn append_replay_contract(\n &mut self,\n contract: ReplayContract,\n ) -> Result {\n self.0.borrow_mut().append_replay_contract(contract)\n }\n\n fn query_fact_batch(&self, batch_id: &BatchId) -> Option {\n self.0.borrow().query_fact_batch(batch_id)\n }\n\n fn query_replay_contract(&self, contract_ref: &ReplayContractRef) -> Option {\n self.0.borrow().query_replay_contract(contract_ref)\n }\n\n fn query_ingest(&self, batch_id: &BatchId) -> Option {\n self.0.borrow().query_ingest(batch_id)\n }\n }\n\n fn live_execution_spec() -> WorldviewExecutionSpec {\n WorldviewExecutionSpec {\n worldview_pin: PinRef {\n id: WorldviewId(\"wv-live-okx\".to_string()),\n version: Version(1),\n },\n signal_keys: vec![\n SignalKeySpec {\n key: \"bar:open\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"open;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:high\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"high;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:low\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"low;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:close\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"close;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n SignalKeySpec {\n key: \"bar:volume\".to_string(),\n value_domain: ValueDomain::Continuous,\n description: \"volume;version_id=v1;catalog_ref=core-bars\".to_string(),\n },\n ],\n }\n }\n\n fn local_rows() -> Vec {\n let mut row1_fields = BTreeMap::new();\n row1_fields.insert(\"bar:open\".to_string(), RawValue::F64(1.0));\n row1_fields.insert(\"bar:high\".to_string(), RawValue::F64(1.2));\n row1_fields.insert(\"bar:low\".to_string(), RawValue::F64(0.9));\n row1_fields.insert(\"bar:close\".to_string(), RawValue::F64(1.1));\n row1_fields.insert(\"bar:volume\".to_string(), RawValue::F64(100.0));\n\n let mut row2_fields = BTreeMap::new();\n row2_fields.insert(\"bar:open\".to_string(), RawValue::F64(1.1));\n row2_fields.insert(\"bar:high\".to_string(), RawValue::F64(1.3));\n row2_fields.insert(\"bar:low\".to_string(), RawValue::F64(1.0));\n row2_fields.insert(\"bar:close\".to_string(), RawValue::F64(1.2));\n row2_fields.insert(\"bar:volume\".to_string(), RawValue::F64(120.0));\n\n vec![\n RawRow {\n adapter: \"okx\".to_string(),\n family: InterfaceFamily::Kline,\n symbol: \"BTCUSDT\".to_string(),\n timestamp_ms: 1_700_000_000_000,\n fields: row1_fields,\n },\n RawRow {\n adapter: \"okx\".to_string(),\n family: InterfaceFamily::Kline,\n symbol: \"BTCUSDT\".to_string(),\n timestamp_ms: 1_700_000_060_000,\n fields: row2_fields,\n },\n ]\n }\n\n #[test]\n fn replay_rejects_execution_spec_fingerprint_mismatch() {\n let mut service = IngestService::new();\n let ingest_receipt = service\n .ingest_from_rows(\n local_rows(),\n \"BTCUSDT\",\n \"tv/1m\".to_string(),\n SourceFingerprint(\"okx-local-btcusdt\".to_string()),\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"local ingest should succeed\");\n\n let spec = live_execution_spec();\n let (_materialized, materialize_receipt) = service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"materialize should succeed\");\n\n let mut drifted_spec = spec.clone();\n drifted_spec.signal_keys[0].description =\n \"open;version_id=v2;catalog_ref=core-bars\".to_string();\n\n let err = service\n .replay(&materialize_receipt.replay_contract_ref, &drifted_spec)\n .expect_err(\"replay should reject execution spec drift\");\n match err {\n ServiceError::ReplayMismatch { field, .. } => {\n assert_eq!(field, \"execution_spec_fingerprint\");\n }\n other => panic!(\"expected ReplayMismatch, got {other:?}\"),\n }\n }\n\n #[test]\n fn replay_works_across_service_instances_via_journal_fact_batch() {\n let shared_journal = SharedMemoryJournal::default();\n let mut writer = IngestService::with_journal(shared_journal.clone());\n let ingest_receipt = writer\n .ingest_from_rows(\n local_rows(),\n \"BTCUSDT\",\n \"tv/1m\".to_string(),\n SourceFingerprint(\"okx-local-btcusdt\".to_string()),\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"local ingest should succeed\");\n let spec = live_execution_spec();\n let (_materialized, materialize_receipt) = writer\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"materialize should succeed\");\n\n let reader = IngestService::with_journal(shared_journal);\n let (replayed, replay_receipt) = reader\n .replay(&materialize_receipt.replay_contract_ref, &spec)\n .expect(\"replay should succeed across service instances\");\n\n assert_eq!(replayed.batch_id, ingest_receipt.batch_id);\n assert_eq!(\n replay_receipt.replay_contract_ref,\n materialize_receipt.replay_contract_ref\n );\n assert_eq!(replayed.signal_samples.len(), 10);\n }\n\n #[test]\n fn materialize_rejects_replay_contract_ref_conflict() {\n let mut service = IngestService::new();\n let ingest_receipt = service\n .ingest_from_rows(\n local_rows(),\n \"BTCUSDT\",\n \"tv/1m\".to_string(),\n SourceFingerprint(\"okx-local-btcusdt\".to_string()),\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"local ingest should succeed\");\n let spec = live_execution_spec();\n\n service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-local\".to_string()),\n )\n .expect(\"first materialize should succeed\");\n\n let err = service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.2-local\".to_string()),\n )\n .expect_err(\"second materialize with changed replay anchor must fail\");\n\n match err {\n ServiceError::Journal(msg) => {\n assert!(msg.contains(\"replay contract ref conflict\"));\n }\n other => panic!(\"expected Journal conflict error, got {other:?}\"),\n }\n }\n\n #[test]\n fn ingest_batch_rejects_non_utc_timezone() {\n let mut service = IngestService::new();\n let err = service\n .ingest_batch(\n \"okx\",\n InterfaceFamily::Kline,\n \"BTCUSDT\",\n TimePeriod::Daily,\n Timezone::AsiaShanghai,\n \"tv/1d\".to_string(),\n 5,\n SourceFingerprint(\"okx-live-btcusdt\".to_string()),\n CodeVersion(\"v0.1-live\".to_string()),\n )\n .expect_err(\"non-UTC timezone should be rejected before adapter pull\");\n\n assert!(\n err.to_string().contains(\"unsupported timezone\"),\n \"unexpected error: {err:?}\"\n );\n }\n\n #[test]\n #[ignore = \"requires external network and OKX public API availability\"]\n fn live_okx_ingest_materialize_replay() {\n let mut service = IngestService::new();\n let ingest_receipt = service\n .ingest_batch(\n \"okx\",\n InterfaceFamily::Kline,\n \"BTCUSDT\",\n TimePeriod::Daily,\n Timezone::Utc,\n \"tv/1d\".to_string(),\n 5,\n SourceFingerprint(\"okx-live-btcusdt\".to_string()),\n CodeVersion(\"v0.1-live\".to_string()),\n )\n .expect(\"okx ingest should return bars\");\n\n let spec = live_execution_spec();\n let (materialized, materialize_receipt) = service\n .materialize(\n &ingest_receipt.batch_id,\n &spec,\n CodeVersion(\"v0.1-live\".to_string()),\n )\n .expect(\"materialize should succeed\");\n\n assert!(!service.get_batch(&ingest_receipt.batch_id).unwrap().fact_rows.is_empty());\n assert_eq!(\n materialized.signal_samples.len(),\n service.get_batch(&ingest_receipt.batch_id).unwrap().fact_count * 5\n );\n\n let (replayed, replay_receipt) = service\n .replay(&materialize_receipt.replay_contract_ref, &spec)\n .expect(\"replay should succeed\");\n\n assert_eq!(replayed.signal_samples.len(), materialized.signal_samples.len());\n assert_eq!(\n replay_receipt.replay_contract_ref,\n materialize_receipt.replay_contract_ref\n );\n }\n}\n\n\n# market-ingest/src/signal/lineage_sidecar.rs (15 lines, ~95 tokens) [full]\n\nuse crate::model::WorldviewExecutionSpec;\n\n#[derive(Clone, Debug, PartialEq, Eq)]\npub struct LineageSidecar {\n pub worldview_version: u64,\n pub notes: Vec,\n}\n\npub fn build(spec: &WorldviewExecutionSpec) -> LineageSidecar {\n LineageSidecar {\n worldview_version: spec.worldview_pin.version.0,\n notes: vec![\"execution-sidecar\".to_string()],\n }\n}\n\n\n# market-ingest/src/AGENTS.md (82 lines, ~927 tokens) [full]\n\n# market-ingest/src 总体工作协议\n\n## 这个文件解决什么\n\n- 给 `src/` 提供统一入口说明。\n- 固定主链、边界和红线,避免跨目录语义漂移。\n- 细节实现去各子域 `AGENTS.md`,这里不重复展开。\n\n## 目录形状\n\n```text\nmarket-ingest/src\n├── adapter/ # 数据源差异收口;RawRow 输出;python host/registry/provider 实现\n│ ├── registry.rs # adapter + family 路由,来源字段校验\n│ ├── python_host.rs # Python 模块加载与调用\n│ └── python/ # okx/tickflow provider 代码与能力声明\n├── fact/ # 事实闭包:类型收敛 + 物理校验 + batch 组装 + journal 端口\n│ ├── raw_to_fact.rs # RawRow -> FactRow\n│ ├── row_invariants.rs # 时间戳/数值合法性校验\n│ ├── batch_builder.rs # BatchId + ReplayAnchor + IngestReceipt\n│ └── journal_port.rs # fact batch / ingest / replay contract 存取\n├── signal/ # 执行解释内核\n│ ├── spec_gate.rs # 规格充分性判定,缺 key 直接 reject\n│ ├── fact_to_signal.rs # FactRow -> SignalSample 映射与排序\n│ └── lineage_sidecar.rs # 执行侧证据\n├── model/ # 共享执行对象\n│ ├── refs.rs # BatchId/TimeViewId/TimePeriod/Timezone/ReplayAnchor\n│ ├── batch.rs # FactRow/FactBatch/SignalSample/SignalBatch/ExecutionSpec\n│ └── receipt.rs # ReplayContract + 各类 Receipt\n├── service.rs # ingest/materialize/replay 唯一编排入口\n└── lib.rs # 模块导出\n```\n\n## 模块职责\n\n- `adapter/`:来源差异收口,输出统一 `RawRow`。\n- `fact/`:`RawRow -> FactRow`,物理校验,组装 `FactBatch` 与 `ReplayAnchor`。\n- `signal/`:统一解释内核(`spec_gate + fact_to_signal`),输出 `SignalSample`。\n- `model/`:共享执行对象(refs/batch/receipt)。\n- `service.rs`:`ingest/materialize/replay` 唯一编排入口。\n\n## 主链调用\n\n- ingest\n - `service::ingest_batch | ingest_from_rows`\n - `adapter::fetch_raw_rows`\n - `fact::normalize_rows`\n - `fact::assemble_normalized_batch`\n - `journal.append_fact_batch + append_ingest`\n- materialize\n - `service::materialize`\n - `journal.query_fact_batch`\n - `signal::project_facts(spec_gate -> fact_to_signal)`\n - `build ReplayContract(execution_spec_fingerprint)`\n - `journal.append_replay_contract`\n - `SignalBatch(signal_samples)`\n- replay\n - `service::replay`\n - `journal.query_replay_contract`\n - 校验 `worldview_pin + execution_spec_fingerprint + source_fingerprint + time_view_id`\n - `signal::project_facts`\n - `ReplayReceipt`\n\n## 全局边界\n\n- `materia::Worldview` 是治理声明对象;`market-ingest` 只消费执行规格:`worldview_pin + signal_keys`。\n- 解释路径必须统一为 `gate + mapper`;禁止 worldview case-by-case 分支。\n- canonical key owner 在 adapter mapper;`fact/raw_to_fact` 不做二次 key 重命名。\n- 时间顺序 owner 在 adapter;fact/signal 不做第二套排序语义。\n\n## 非主链说明\n\n- `channel.rs` 当前不在主链调用图中。\n- 若接入主链,必须在本文件与对应子域文档同步更新调用路径。\n- 若不接入,保持隔离并避免被误用为隐式入口。\n\n## 红线\n\n- 不改治理语义,不把 worldview 解释逻辑拉进 ingest/adapter。\n- 不新增补偿层来掩盖边界错误;优先删除、合并、收紧边界。\n- replay 合同一致性锚必须保留并强校验:`execution_spec_fingerprint`。\n", "metadata": { - "truncated": false + "truncated": true }, - "title": "", + "title": "trimmed malformed todowrite fixture", "time": { "start": 1775181853170, "end": 1775181853197