Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 260 additions & 9 deletions src/crates/core/src/agentic/execution/round_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::util::errors::{BitFunError, BitFunResult};
use crate::util::types::Message as AIMessage;
use crate::util::types::ToolDefinition;
use dashmap::DashMap;
use log::{debug, error, warn};
use log::{debug, error, info, warn};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -481,10 +481,28 @@ impl RoundExecutor {
return Err(BitFunError::Cancelled("Execution cancelled".to_string()));
}

// ---- Write tool content generation ----
// For Write tool calls without a "content" field, spawn a separate AI
// request with the full session history to generate the file content as
// plain text wrapped in <bitfun_contents> tags. This avoids having the
// model emit large file contents inside JSON tool-call arguments, which
// is a major source of JSON parse failures.
let tool_calls = stream_result.tool_calls.clone();
let tool_calls = self
.generate_write_tool_contents(
ai_client.clone(),
&context,
&ai_messages,
tool_calls,
&cancel_token,
event_subagent_parent_info.clone(),
)
.await?;

// Execute tool calls
debug!(
"Preparing to execute tool calls: count={}",
stream_result.tool_calls.len()
tool_calls.len()
);

let tool_phase_started_at = Instant::now();
Expand Down Expand Up @@ -566,18 +584,17 @@ impl RoundExecutor {
// Execute tools — convert pipeline-level Err into per-tool error results
// so the model always receives a tool_result for every tool_call.
let execution_results = match tool_pipeline
.execute_tools(stream_result.tool_calls.clone(), tool_context, tool_options)
.execute_tools(tool_calls.clone(), tool_context, tool_options)
.await
{
Ok(results) => results,
Err(e) => {
error!(
"Tool pipeline execution failed, generating error results for all {} tool calls: {}",
stream_result.tool_calls.len(),
tool_calls.len(),
e
);
stream_result
.tool_calls
tool_calls
.iter()
.map(|tc| crate::agentic::tools::pipeline::ToolExecutionResult {
tool_id: tc.tool_id.clone(),
Expand Down Expand Up @@ -620,7 +637,7 @@ impl RoundExecutor {
let assistant_message = Message::assistant_with_reasoning(
reasoning,
stream_result.full_text.clone(),
stream_result.tool_calls.clone(),
tool_calls.clone(),
)
.with_turn_id(context.dialog_turn_id.clone())
.with_round_id(round_id.clone())
Expand Down Expand Up @@ -676,7 +693,7 @@ impl RoundExecutor {

Ok(RoundResult {
assistant_message,
tool_calls: stream_result.tool_calls.clone(),
tool_calls: tool_calls.clone(),
tool_result_messages,
has_more_rounds,
finish_reason: if has_more_rounds {
Expand Down Expand Up @@ -736,6 +753,193 @@ impl RoundExecutor {
}
}

/// Generate file content for Write tool calls that lack a `content` field.
///
/// When a Write tool call arrives without `content`, this method spawns a
/// separate AI request with the full session history and a directive to
/// output the file content as plain text inside `<bitfun_contents>` tags.
/// The extracted content is then injected into the tool call arguments so
/// the downstream Write tool execution proceeds as normal.
async fn generate_write_tool_contents(
&self,
ai_client: Arc<AIClient>,
context: &RoundContext,
ai_messages: &[AIMessage],
mut tool_calls: Vec<ToolCall>,
cancel_token: &CancellationToken,
subagent_parent_info: Option<crate::agentic::events::SubagentParentInfo>,
) -> BitFunResult<Vec<ToolCall>> {
// Find indices of Write tool calls that need content generation
let write_indices: Vec<usize> = tool_calls
.iter()
.enumerate()
.filter(|(_, tc)| {
tc.tool_name == "Write"
&& tc.arguments.get("content").is_none()
&& tc.arguments.get("file_path").and_then(|v| v.as_str()).is_some()
})
.map(|(i, _)| i)
.collect();

if write_indices.is_empty() {
return Ok(tool_calls);
}

info!(
"Generating content for {} Write tool call(s) via separate AI request",
write_indices.len()
);

for idx in &write_indices {
if cancel_token.is_cancelled() {
return Err(BitFunError::Cancelled("Execution cancelled".to_string()));
}

let tc = &tool_calls[*idx];
let file_path = tc
.arguments
.get("file_path")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tool_id = tc.tool_id.clone();

// Emit Started event so the UI can show the tool card
self.emit_event(
AgenticEvent::ToolEvent {
session_id: context.session_id.clone(),
turn_id: context.dialog_turn_id.clone(),
tool_event: ToolEventData::Started {
tool_id: tool_id.clone(),
tool_name: "Write".to_string(),
params: tc.arguments.clone(),
timeout_seconds: None,
},
subagent_parent_info: subagent_parent_info.clone(),
},
EventPriority::High,
)
.await;

// Build a content-generation prompt
let content_prompt = format!(
"Now output the complete file content for the file `{}`. \
Output ONLY the raw file content wrapped in <bitfun_contents> tags. \
Do NOT include any other text, explanation, or commentary outside the tags.\n\
<bitfun_contents>\n",
file_path
);

let mut content_messages = ai_messages.to_vec();
content_messages.push(AIMessage::user(content_prompt));

// Send the content-generation request (no tools, pure text output)
let full_text = match ai_client
.send_message_stream(content_messages, None)
.await
{
Ok(response) => {
let mut text = String::new();
let mut stream = response.stream;
use futures::StreamExt;
while let Some(chunk) = stream.next().await {
if cancel_token.is_cancelled() {
return Err(BitFunError::Cancelled("Execution cancelled".to_string()));
}
match chunk {
Ok(resp) => {
let chunk_text = resp.text.unwrap_or_default();
if !chunk_text.is_empty() {
text.push_str(&chunk_text);

// Emit streaming ParamsPartial so the UI
// shows a live content preview
let params = serde_json::json!({
"file_path": &file_path,
"content": &text,
});
self.emit_event(
AgenticEvent::ToolEvent {
session_id: context.session_id.clone(),
turn_id: context.dialog_turn_id.clone(),
tool_event: ToolEventData::ParamsPartial {
tool_id: tool_id.clone(),
tool_name: "Write".to_string(),
params: params.to_string(),
},
subagent_parent_info: subagent_parent_info.clone(),
},
EventPriority::Normal,
)
.await;
}
}
Err(e) => {
error!("Error in Write content generation stream: {}", e);
break;
}
}
}
text
}
Err(e) => {
error!("Write content generation request failed: {}", e);
return Err(BitFunError::AIClient(format!(
"Write content generation failed for {}: {}",
file_path, e
)));
}
};

let content = extract_bitfun_contents(&full_text);
if content.is_empty() {
warn!(
"Write content generation returned empty content for file_path={}",
file_path
);
}

let final_params = serde_json::json!({
"file_path": &file_path,
"content": &content,
});
self.emit_event(
AgenticEvent::ToolEvent {
session_id: context.session_id.clone(),
turn_id: context.dialog_turn_id.clone(),
tool_event: ToolEventData::ParamsPartial {
tool_id: tool_id.clone(),
tool_name: "Write".to_string(),
params: final_params.to_string(),
},
subagent_parent_info: subagent_parent_info.clone(),
},
EventPriority::Normal,
)
.await;

// Inject content into the tool call arguments
tool_calls[*idx]
.arguments
.as_object_mut()
.expect("Write tool arguments must be a JSON object")
.insert("content".to_string(), serde_json::Value::String(content));

debug!(
"Write content generated: file_path={}, content_len={}",
file_path,
tool_calls[*idx]
.arguments
.get("content")
.and_then(|v| v.as_str())
.map(|s| s.len())
.unwrap_or(0)
);
}

Ok(tool_calls)
}

/// Emit event
async fn emit_event(&self, event: AgenticEvent, priority: EventPriority) {
let _ = self.event_queue.enqueue(event, Some(priority)).await;
Expand Down Expand Up @@ -889,9 +1093,32 @@ fn token_details_from_usage(
(!details.is_empty()).then_some(serde_json::Value::Object(details))
}

/// Extract content from `<bitfun_contents>...</bitfun_contents>` tags.
///
/// If the tags are present, returns the text between them (trimmed).
/// If the tags are not present, returns the full text trimmed (fallback for
/// models that ignore the tag instruction).
fn extract_bitfun_contents(text: &str) -> String {
const OPEN_TAG: &str = "<bitfun_contents>";
const CLOSE_TAG: &str = "</bitfun_contents>";

if let Some(start) = text.find(OPEN_TAG) {
let content_start = start + OPEN_TAG.len();
if let Some(end) = text[content_start..].find(CLOSE_TAG) {
return text[content_start..content_start + end].trim().to_string();
}
// Opening tag found but no closing tag — take everything after the
// opening tag (the model may still be streaming or forgot to close).
return text[content_start..].trim().to_string();
}

// No tags at all — return the full text as a fallback
text.trim().to_string()
}

#[cfg(test)]
mod tests {
use super::{RoundExecutor, StreamProcessor};
use super::{extract_bitfun_contents, RoundExecutor, StreamProcessor};
use crate::agentic::events::{EventQueue, EventQueueConfig};
use dashmap::DashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -1020,4 +1247,28 @@ mod tests {
"I can help with that."
));
}

#[test]
fn extract_bitfun_contents_with_tags() {
let text = "Some preamble\n<bitfun_contents>\nfn main() {}\n</bitfun_contents>\nSome trailing";
assert_eq!(extract_bitfun_contents(text), "fn main() {}");
}

#[test]
fn extract_bitfun_contents_without_tags_fallback() {
let text = "fn main() {}";
assert_eq!(extract_bitfun_contents(text), "fn main() {}");
}

#[test]
fn extract_bitfun_contents_open_tag_only() {
let text = "<bitfun_contents>\nfn main() {}";
assert_eq!(extract_bitfun_contents(text), "fn main() {}");
}

#[test]
fn extract_bitfun_contents_empty() {
let text = "<bitfun_contents></bitfun_contents>";
assert_eq!(extract_bitfun_contents(text), "");
}
}
Loading
Loading