-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathstreaming.rs
More file actions
240 lines (198 loc) · 7.56 KB
/
streaming.rs
File metadata and controls
240 lines (198 loc) · 7.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
//! Example demonstrating streaming output from Docker commands
//!
//! This example shows how to use the streaming API to get real-time output
//! from Docker commands like build, run, and logs.
//!
//! Run with: cargo run --example streaming
use docker_wrapper::command::DockerCommand;
use docker_wrapper::{BuildCommand, LogsCommand, RunCommand};
use docker_wrapper::{OutputLine, StreamHandler, StreamableCommand};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Docker Streaming Example");
println!("========================\n");
// Example 1: Stream build output
example_build_streaming().await?;
// Example 2: Stream container output
example_run_streaming().await?;
// Example 3: Stream logs with filtering
example_logs_filtering().await?;
// Example 4: Channel-based streaming
example_channel_streaming().await?;
println!("\n✨ All streaming examples completed!");
Ok(())
}
async fn example_build_streaming() -> Result<(), Box<dyn std::error::Error>> {
println!("📦 Example 1: Streaming Docker Build Output");
println!("-------------------------------------------");
// Create a simple Dockerfile for testing
std::fs::write(
"Dockerfile.streaming",
r#"
FROM alpine:latest
RUN echo "Step 1: Installing packages..."
RUN echo "Step 2: Setting up application..."
RUN echo "Step 3: Configuring environment..."
CMD ["echo", "Build complete!"]
"#,
)?;
println!("Building image with streaming output...\n");
// Stream build output to console
let result = BuildCommand::new(".")
.file("Dockerfile.streaming")
.tag("streaming-example:latest")
.stream(StreamHandler::print())
.await?;
if result.is_success() {
println!("\n✅ Build completed successfully!");
} else {
println!("\n❌ Build failed with exit code: {}", result.exit_code);
}
// Clean up
std::fs::remove_file("Dockerfile.streaming")?;
Ok(())
}
async fn example_run_streaming() -> Result<(), Box<dyn std::error::Error>> {
println!("\n🚀 Example 2: Streaming Container Output");
println!("----------------------------------------");
// Run a container that produces output over time
println!("Running container with streaming output...\n");
let line_count = Arc::new(AtomicUsize::new(0));
let count_clone = line_count.clone();
let result = RunCommand::new("alpine")
.cmd(vec![
"sh".to_string(),
"-c".to_string(),
"for i in 1 2 3 4 5; do echo \"Line $i\"; sleep 0.1; done".to_string(),
])
.remove() // Remove container after exit
.stream(move |line| match line {
OutputLine::Stdout(text) => {
println!("Container: {}", text);
count_clone.fetch_add(1, Ordering::SeqCst);
}
OutputLine::Stderr(text) => {
eprintln!("Container Error: {}", text);
}
})
.await?;
println!("\n✅ Container exited with code: {}", result.exit_code);
println!(
" Processed {} lines of output",
line_count.load(Ordering::SeqCst)
);
Ok(())
}
async fn example_logs_filtering() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📜 Example 3: Streaming Logs with Filtering");
println!("-------------------------------------------");
// First, create a container that generates logs
println!("Creating a container for log streaming...");
let container_name = "streaming-log-example";
// Run a container in detached mode that generates logs
RunCommand::new("alpine")
.name(container_name)
.detach()
.remove()
.cmd(vec!["sh".to_string(), "-c".to_string(), "for i in 1 2 3 4 5; do echo \"Log entry $i\"; echo \"Error: Something went wrong $i\" >&2; sleep 1; done".to_string()])
.execute()
.await?;
println!("Streaming logs with custom filtering...\n");
// Stream logs with a custom filter
let error_count = Arc::new(AtomicUsize::new(0));
let info_count = Arc::new(AtomicUsize::new(0));
let error_clone = error_count.clone();
let info_clone = info_count.clone();
let _result = LogsCommand::new(container_name)
.follow()
.timestamps()
.tail("all")
.stream(move |line| match line {
OutputLine::Stdout(text) => {
if text.contains("Log entry") {
println!("[INFO] {}", text);
info_clone.fetch_add(1, Ordering::SeqCst);
}
}
OutputLine::Stderr(text) => {
if text.contains("Error") {
eprintln!("[ERROR] {}", text);
error_clone.fetch_add(1, Ordering::SeqCst);
}
}
})
.await;
// Note: The logs command will continue until the container exits
println!("\n✅ Log streaming completed");
println!(" Info messages: {}", info_count.load(Ordering::SeqCst));
println!(" Error messages: {}", error_count.load(Ordering::SeqCst));
// Stop and remove the container
let _ = std::process::Command::new("docker")
.args(["stop", container_name])
.output();
Ok(())
}
async fn example_channel_streaming() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📡 Example 4: Channel-based Streaming");
println!("-------------------------------------");
println!("Using channel to process output asynchronously...\n");
// Run a command that produces output
let (mut rx, _result) = RunCommand::new("alpine")
.cmd(vec![
"sh".to_string(),
"-c".to_string(),
"for i in 1 2 3; do echo \"Data: $i\"; sleep 0.5; done".to_string(),
])
.remove()
.stream_channel()
.await?;
// Process output from channel in a separate task
let processor = tokio::spawn(async move {
let mut lines = Vec::new();
while let Some(line) = rx.recv().await {
match line {
OutputLine::Stdout(text) => {
println!("Received via channel: {}", text);
lines.push(text);
}
OutputLine::Stderr(text) => {
eprintln!("Error via channel: {}", text);
}
}
}
lines
});
// Wait for both the command and processor to complete
let lines = processor.await?;
println!("\n✅ Channel streaming completed");
println!(" Collected {} lines via channel", lines.len());
Ok(())
}
// Additional example: Progress tracking during build
#[allow(dead_code)]
async fn example_build_progress() -> Result<(), Box<dyn std::error::Error>> {
println!("\n🏗️ Bonus: Build Progress Tracking");
println!("---------------------------------");
let mut current_step = 0;
let total_steps = 5;
let result = BuildCommand::new(".")
.tag("progress-example:latest")
.stream(move |line| {
if let OutputLine::Stdout(text) = line {
if text.contains("Step") {
current_step += 1;
let progress = (current_step as f32 / total_steps as f32) * 100.0;
println!("[{:.0}%] {}", progress, text);
} else {
println!(" {}", text);
}
}
})
.await?;
if result.is_success() {
println!("\n✅ Build completed with progress tracking!");
}
Ok(())
}