Streaming Chat Example
This example demonstrates how to use streaming responses for real-time output.
Basic Streaming
use vllm_client::{VllmClient, json, StreamEvent}; use futures::StreamExt; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = VllmClient::new("http://localhost:8000/v1"); let mut stream = client .chat .completions() .create() .model("Qwen/Qwen2.5-7B-Instruct") .messages(json!([ {"role": "user", "content": "Write a short story about a robot learning to paint."} ])) .temperature(0.8) .max_tokens(1024) .stream(true) .send_stream() .await?; print!("Response: "); while let Some(event) = stream.next().await { match event { StreamEvent::Content(delta) => { print!("{}", delta); std::io::Write::flush(&mut std::io::stdout()).ok(); } StreamEvent::Done => break, StreamEvent::Error(e) => { eprintln!("\nError: {}", e); break; } _ => {} } } println!(); Ok(()) }
Streaming with Reasoning (Thinking Models)
For models that support thinking/reasoning mode:
use vllm_client::{VllmClient, json, StreamEvent}; use futures::StreamExt; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = VllmClient::new("http://localhost:8000/v1"); let mut stream = client .chat .completions() .create() .model("Qwen/Qwen2.5-7B-Instruct") .messages(json!([ {"role": "user", "content": "Solve: What is 15 * 23 + 47?"} ])) .extra(json!({ "chat_template_kwargs": { "think_mode": true } })) .stream(true) .send_stream() .await?; let mut reasoning = String::new(); let mut content = String::new(); while let Some(event) = stream.next().await { match event { StreamEvent::Reasoning(delta) => { reasoning.push_str(&delta); eprintln!("[thinking] {}", delta); } StreamEvent::Content(delta) => { content.push_str(&delta); print!("{}", delta); std::io::Write::flush(&mut std::io::stdout()).ok(); } StreamEvent::Done => break, StreamEvent::Error(e) => { eprintln!("\nError: {}", e); break; } _ => {} } } println!("\n"); if !reasoning.is_empty() { println!("--- Reasoning Process ---"); println!("{}", reasoning); } Ok(()) }
Streaming with Progress Indicator
Add a typing indicator while waiting for the first token:
use vllm_client::{VllmClient, json, StreamEvent}; use futures::StreamExt; use std::time::{Duration, Instant}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = VllmClient::new("http://localhost:8000/v1"); let waiting = Arc::new(AtomicBool::new(true)); let waiting_clone = Arc::clone(&waiting); // Spawn typing indicator task let indicator = tokio::spawn(async move { let chars = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']; let mut i = 0; while waiting_clone.load(Ordering::Relaxed) { print!("\r{} Thinking...", chars[i]); std::io::Write::flush(&mut std::io::stdout()).ok(); i = (i + 1) % chars.len(); tokio::time::sleep(Duration::from_millis(80)).await; } print!("\r \r"); // Clear the indicator }); let mut stream = client .chat .completions() .create() .model("Qwen/Qwen2.5-7B-Instruct") .messages(json!([ {"role": "user", "content": "Explain quantum entanglement in simple terms."} ])) .stream(true) .send_stream() .await?; let mut first_token = true; let mut content = String::new(); while let Some(event) = stream.next().await { match event { StreamEvent::Content(delta) => { if first_token { waiting.store(false, Ordering::Relaxed); indicator.await.ok(); first_token = false; println!("Response:"); println!("---------"); } content.push_str(&delta); print!("{}", delta); std::io::Write::flush(&mut std::io::stdout()).ok(); } StreamEvent::Done => break, StreamEvent::Error(e) => { waiting.store(false, Ordering::Relaxed); eprintln!("\nError: {}", e); break; } _ => {} } } println!("\n"); Ok(()) }
Multi-turn Streaming Conversation
Handle a conversation with streaming responses:
use vllm_client::{VllmClient, json, StreamEvent}; use futures::StreamExt; use std::io::{self, BufRead, Write}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = VllmClient::new("http://localhost:8000/v1"); let mut messages: Vec<serde_json::Value> = Vec::new(); println!("Chat with the AI (type 'quit' to exit)"); println!("----------------------------------------\n"); let stdin = io::stdin(); for line in stdin.lock().lines() { let input = line?; if input.trim() == "quit" { break; } if input.trim().is_empty() { continue; } // Add user message messages.push(json!({"role": "user", "content": input})); // Stream response let mut stream = client .chat .completions() .create() .model("Qwen/Qwen2.5-7B-Instruct") .messages(json!(messages)) .stream(true) .send_stream() .await?; print!("AI: "); io::stdout().flush().ok(); let mut response_content = String::new(); while let Some(event) = stream.next().await { match event { StreamEvent::Content(delta) => { response_content.push_str(&delta); print!("{}", delta); io::stdout().flush().ok(); } StreamEvent::Done => break, StreamEvent::Error(e) => { eprintln!("\nError: {}", e); break; } _ => {} } } println!("\n"); // Add assistant response to history messages.push(json!({"role": "assistant", "content": response_content})); } println!("Goodbye!"); Ok(()) }
Streaming with Timeout
Add timeout handling for slow responses:
use vllm_client::{VllmClient, json, StreamEvent, VllmError}; use futures::StreamExt; use tokio::time::{timeout, Duration}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = VllmClient::new("http://localhost:8000/v1") .timeout_secs(300); let mut stream = client .chat .completions() .create() .model("Qwen/Qwen2.5-7B-Instruct") .messages(json!([ {"role": "user", "content": "Write a detailed essay about AI."} ])) .stream(true) .send_stream() .await?; let mut content = String::new(); loop { // 30 second timeout per event match timeout(Duration::from_secs(30), stream.next()).await { Ok(Some(event)) => { match event { StreamEvent::Content(delta) => { content.push_str(&delta); print!("{}", delta); std::io::Write::flush(&mut std::io::stdout()).ok(); } StreamEvent::Done => break, StreamEvent::Error(e) => { eprintln!("\nStream error: {}", e); return Err(e.into()); } _ => {} } } Ok(None) => break, Err(_) => { eprintln!("\nTimeout waiting for next token"); break; } } } println!("\n\nGenerated {} characters", content.len()); Ok(()) }
Collecting Usage Statistics
Track token usage during streaming:
use vllm_client::{VllmClient, json, StreamEvent, Usage}; use futures::StreamExt; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = VllmClient::new("http://localhost:8000/v1"); let mut stream = client .chat .completions() .create() .model("Qwen/Qwen2.5-7B-Instruct") .messages(json!([ {"role": "user", "content": "Write a poem about the ocean."} ])) .stream(true) .send_stream() .await?; let mut content = String::new(); let mut usage: Option<Usage> = None; let mut start_time = std::time::Instant::now(); let mut token_count = 0; while let Some(event) = stream.next().await { match event { StreamEvent::Content(delta) => { content.push_str(&delta); token_count += 1; print!("{}", delta); std::io::Write::flush(&mut std::io::stdout()).ok(); } StreamEvent::Usage(u) => { usage = Some(u); } StreamEvent::Done => break, _ => {} } } let elapsed = start_time.elapsed(); println!("\n"); println!("--- Statistics ---"); println!("Time: {:.2}s", elapsed.as_secs_f64()); println!("Characters: {}", content.len()); if let Some(usage) = usage { println!("Prompt tokens: {}", usage.prompt_tokens); println!("Completion tokens: {}", usage.completion_tokens); println!("Total tokens: {}", usage.total_tokens); println!("Tokens/second: {:.2}", usage.completion_tokens as f64 / elapsed.as_secs_f64()); } Ok(()) }
See Also
- Basic Chat - Simple chat completion
- Tool Calling - Function calling examples
- Streaming API - Streaming API reference