Streaming API

Streaming responses allow you to process LLM output in real-time, token by token, instead of waiting for the complete response.

Overview

vLLM Client provides streaming support through Server-Sent Events (SSE). Use send_stream() instead of send() to get a streaming response.

Basic Streaming

use vllm_client::{VllmClient, json, StreamEvent};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = VllmClient::new("http://localhost:8000/v1");

    let mut stream = client
        .chat
        .completions()
        .create()
        .model("Qwen/Qwen2.5-7B-Instruct")
        .messages(json!([
            {"role": "user", "content": "Write a poem about spring"}
        ]))
        .stream(true)
        .send_stream()
        .await?;

    while let Some(event) = stream.next().await {
        match event {
            StreamEvent::Content(delta) => print!("{}", delta),
            StreamEvent::Done => break,
            _ => {}
        }
    }

    println!();
    Ok(())
}

StreamEvent Types

The StreamEvent enum represents different types of streaming events:

VariantDescription
Content(String)Regular content token delta
Reasoning(String)Reasoning/thinking content (for thinking models)
ToolCallDeltaStreaming tool call delta
ToolCallComplete(ToolCall)Complete tool call ready to execute
Usage(Usage)Token usage statistics
DoneStream completed successfully
Error(VllmError)An error occurred

Content Events

The most common event type, containing text tokens:

#![allow(unused)]
fn main() {
match event {
    StreamEvent::Content(delta) => {
        print!("{}", delta);
        std::io::Write::flush(&mut std::io::stdout()).ok();
    }
    _ => {}
}
}

Reasoning Events

For models with reasoning capabilities (like Qwen with thinking mode):

#![allow(unused)]
fn main() {
match event {
    StreamEvent::Reasoning(delta) => {
        eprintln!("[thinking] {}", delta);
    }
    StreamEvent::Content(delta) => {
        print!("{}", delta);
    }
    _ => {}
}
}

Tool Call Events

Tool calls are streamed incrementally and then completed:

#![allow(unused)]
fn main() {
match event {
    StreamEvent::ToolCallDelta { index, id, name, arguments } => {
        println!("Tool delta: index={}, name={}", index, name);
        // Arguments are streamed as partial JSON
    }
    StreamEvent::ToolCallComplete(tool_call) => {
        println!("Tool ready: {}({})", tool_call.name, tool_call.arguments);
        // Execute the tool and return result
    }
    _ => {}
}
}

Usage Events

Token usage information is typically sent at the end:

#![allow(unused)]
fn main() {
match event {
    StreamEvent::Usage(usage) => {
        println!("Tokens: prompt={}, completion={}, total={}",
            usage.prompt_tokens,
            usage.completion_tokens,
            usage.total_tokens
        );
    }
    _ => {}
}
}

MessageStream

The MessageStream type is an async iterator that yields StreamEvent values.

Methods

MethodReturn TypeDescription
next()Option<StreamEvent>Get next event (async)
collect_content()StringCollect all content into a string
into_stream()impl StreamConvert to generic stream

Collect All Content

For convenience, you can collect all content at once:

#![allow(unused)]
fn main() {
let content = stream.collect_content().await?;
println!("Full response: {}", content);
}

Note: This waits for the complete response, defeating the purpose of streaming. Use only when you need both streaming display and the full text.

Complete Streaming Example

use vllm_client::{VllmClient, json, StreamEvent, VllmError};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), VllmError> {
    let client = VllmClient::new("http://localhost:8000/v1");

    let mut stream = client
        .chat
        .completions()
        .create()
        .model("Qwen/Qwen2.5-7B-Instruct")
        .messages(json!([
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Explain quantum computing in simple terms"}
        ]))
        .temperature(0.7)
        .max_tokens(1024)
        .stream(true)
        .send_stream()
        .await?;

    let mut reasoning = String::new();
    let mut content = String::new();
    let mut usage = None;

    while let Some(event) = stream.next().await {
        match event {
            StreamEvent::Reasoning(delta) => {
                reasoning.push_str(&delta);
            }
            StreamEvent::Content(delta) => {
                content.push_str(&delta);
                print!("{}", delta);
                std::io::Write::flush(&mut std::io::stdout()).ok();
            }
            StreamEvent::Usage(u) => {
                usage = Some(u);
            }
            StreamEvent::Done => {
                println!("\n[Stream completed]");
            }
            StreamEvent::Error(e) => {
                eprintln!("\nError: {}", e);
                return Err(e);
            }
            _ => {}
        }
    }

    // Print summary
    if !reasoning.is_empty() {
        eprintln!("\n--- Reasoning ---");
        eprintln!("{}", reasoning);
    }

    if let Some(usage) = usage {
        eprintln!("\n--- Token Usage ---");
        eprintln!("Prompt: {}, Completion: {}, Total: {}",
            usage.prompt_tokens,
            usage.completion_tokens,
            usage.total_tokens
        );
    }

    Ok(())
}

Streaming with Tool Calling

When streaming with tools, you'll receive incremental tool call updates:

#![allow(unused)]
fn main() {
use vllm_client::{VllmClient, json, StreamEvent, ToolCall};
use futures::StreamExt;

let tools = json!([
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    }
]);

let mut stream = client
    .chat
    .completions()
    .create()
    .model("Qwen/Qwen2.5-7B-Instruct")
    .messages(json!([
        {"role": "user", "content": "What's the weather in Tokyo?"}
    ]))
    .tools(tools)
    .stream(true)
    .send_stream()
    .await?;

let mut tool_calls: Vec<ToolCall> = Vec::new();

while let Some(event) = stream.next().await {
    match event {
        StreamEvent::Content(delta) => print!("{}", delta),
        StreamEvent::ToolCallComplete(tool_call) => {
            tool_calls.push(tool_call);
        }
        StreamEvent::Done => break,
        _ => {}
    }
}

// Execute tool calls
for tool_call in tool_calls {
    println!("Tool: {} with args: {}", tool_call.name, tool_call.arguments);
    // Execute and return result in next message
}
}

Error Handling

Streaming errors can occur at any point:

#![allow(unused)]
fn main() {
use vllm_client::{VllmClient, json, StreamEvent, VllmError};
use futures::StreamExt;

async fn stream_chat(prompt: &str) -> Result<String, VllmError> {
    let client = VllmClient::new("http://localhost:8000/v1");
    
    let mut stream = client
        .chat
        .completions()
        .create()
        .model("Qwen/Qwen2.5-7B-Instruct")
        .messages(json!([{"role": "user", "content": prompt}]))
        .stream(true)
        .send_stream()
        .await?;

    let mut content = String::new();

    while let Some(event) = stream.next().await {
        match event {
            StreamEvent::Content(delta) => content.push_str(&delta),
            StreamEvent::Error(e) => return Err(e),
            StreamEvent::Done => break,
            _ => {}
        }
    }

    Ok(content)
}
}

Best Practices

Flush Output

For real-time display, flush stdout after each token:

#![allow(unused)]
fn main() {
use std::io::{self, Write};

match event {
    StreamEvent::Content(delta) => {
        print!("{}", delta);
        io::stdout().flush().ok();
    }
    _ => {}
}
}

Handle Interruption

For interactive applications, handle Ctrl+C gracefully:

#![allow(unused)]
fn main() {
use tokio::signal;

tokio::select! {
    result = process_stream(&mut stream) => {
        // Normal completion
    }
    _ = signal::ctrl_c() => {
        println!("\n[interrupted]");
    }
}
}

Timeout for Idle Streams

Set a timeout for streams that may hang:

#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};

let result = timeout(
    Duration::from_secs(60),
    stream.next()
).await;

match result {
    Ok(Some(event)) => { /* process event */ }
    Ok(None) => { /* stream ended */ }
    Err(_) => { /* timeout */ }
}
}

Completions Streaming

The vLLM Client also supports streaming for the legacy /v1/completions API using CompletionStreamEvent.

CompletionStreamEvent Types

VariantDescription
Text(String)Text token delta
FinishReason(String)Reason why the stream finished (e.g., "stop", "length")
Usage(Usage)Token usage statistics
DoneStream completed successfully
Error(VllmError)An error occurred

Completions Streaming Example

use vllm_client::{VllmClient, json, CompletionStreamEvent};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = VllmClient::new("http://localhost:8000/v1");

    let mut stream = client
        .completions
        .create()
        .model("Qwen/Qwen2.5-7B-Instruct")
        .prompt("Write a poem about spring")
        .max_tokens(1024)
        .temperature(0.7)
        .stream(true)
        .send_stream()
        .await?;

    while let Some(event) = stream.next().await {
        match event {
            CompletionStreamEvent::Text(delta) => {
                print!("{}", delta);
                std::io::stdout().flush().ok();
            }
            CompletionStreamEvent::FinishReason(reason) => {
                println!("\n[Finish reason: {}]", reason);
            }
            CompletionStreamEvent::Usage(usage) => {
                println!("\nTokens: prompt={}, completion={}, total={}",
                    usage.prompt_tokens,
                    usage.completion_tokens,
                    usage.total_tokens
                );
            }
            CompletionStreamEvent::Done => {
                println!("\n[Stream completed]");
            }
            CompletionStreamEvent::Error(e) => {
                eprintln!("Error: {}", e);
                return Err(e.into());
            }
        }
    }

    Ok(())
}

CompletionStream Methods

MethodReturn TypeDescription
next()Option<CompletionStreamEvent>Get next event (async)
collect_text()StringCollect all text into a string
into_stream()impl StreamConvert to generic stream

Next Steps