Documentation Index
Fetch the complete documentation index at: https://crewai-devin-1778040886-fix-hitl-pre-review-silent-fallback.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Introduction
CrewAI provides the ability to stream real-time output during crew execution, allowing you to display results as they’re generated rather than waiting for the entire process to complete. This feature is particularly useful for building interactive applications, providing user feedback, and monitoring long-running processes.
How Streaming Works
When streaming is enabled, CrewAI captures LLM responses and tool calls as they happen, packaging them into structured chunks that include context about which task and agent is executing. You can iterate over these chunks in real-time and access the final result once execution completes.
Enabling Streaming
To enable streaming, set the stream parameter to True when creating your crew:
from crewai import Agent, Crew, Task
# Create your agents and tasks
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information on topics",
backstory="You are an experienced researcher with excellent analytical skills.",
)
task = Task(
description="Research the latest developments in AI",
expected_output="A detailed report on recent AI advancements",
agent=researcher,
)
# Enable streaming
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True # Enable streaming output
)
Synchronous Streaming
When you call kickoff() on a crew with streaming enabled, it returns a CrewStreamingOutput object that you can iterate over to receive chunks as they arrive:
# Start streaming execution
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})
# Iterate over chunks as they arrive
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
Each chunk provides rich context about the execution:
streaming = crew.kickoff(inputs={"topic": "AI"})
for chunk in streaming:
print(f"Task: {chunk.task_name} (index {chunk.task_index})")
print(f"Agent: {chunk.agent_role}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
if chunk.tool_call:
print(f"Tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
Accessing Streaming Results
The CrewStreamingOutput object provides several useful properties:
streaming = crew.kickoff(inputs={"topic": "AI"})
# Iterate and collect chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"All chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result.raw}")
Asynchronous Streaming
For async applications, you can use either akickoff() (native async) or kickoff_async() (thread-based) with async iteration:
Native Async with akickoff()
The akickoff() method provides true native async execution throughout the entire chain:
import asyncio
async def stream_crew():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Start native async streaming
streaming = await crew.akickoff(inputs={"topic": "AI"})
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
asyncio.run(stream_crew())
Thread-Based Async with kickoff_async()
For simpler async integration or backward compatibility:
import asyncio
async def stream_crew():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Start thread-based async streaming
streaming = await crew.kickoff_async(inputs={"topic": "AI"})
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
asyncio.run(stream_crew())
For high-concurrency workloads, akickoff() is recommended as it uses native async for task execution, memory operations, and knowledge retrieval. See the Kickoff Crew Asynchronously guide for more details.
Streaming with kickoff_for_each
When executing a crew for multiple inputs with kickoff_for_each(), streaming works differently depending on whether you use sync or async:
Synchronous kickoff_for_each
With synchronous kickoff_for_each(), you get a list of CrewStreamingOutput objects, one for each input:
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Returns list of streaming outputs
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)
# Iterate over each streaming output
for i, streaming in enumerate(streaming_outputs):
print(f"\n=== Input {i + 1} ===")
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nResult {i + 1}: {result.raw}")
Asynchronous kickoff_for_each_async
With async kickoff_for_each_async(), you get a single CrewStreamingOutput that yields chunks from all crews as they arrive concurrently:
import asyncio
async def stream_multiple_crews():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Returns single streaming output for all crews
streaming = await crew.kickoff_for_each_async(inputs=inputs_list)
# Chunks from all crews arrive as they're generated
async for chunk in streaming:
print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)
# Access all results
results = streaming.results # List of CrewOutput objects
for i, result in enumerate(results):
print(f"\n\nResult {i + 1}: {result.raw}")
asyncio.run(stream_multiple_crews())
Stream Chunk Types
Chunks can be of different types, indicated by the chunk_type field:
TEXT Chunks
Standard text content from LLM responses:
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
Information about tool calls being made:
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL:
print(f"\nCalling tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
Practical Example: Building a UI with Streaming
Here’s a complete example showing how to build an interactive application with streaming:
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
async def interactive_research():
# Create crew with streaming enabled
researcher = Agent(
role="Research Analyst",
goal="Provide detailed analysis on any topic",
backstory="You are an expert researcher with broad knowledge.",
)
task = Task(
description="Research and analyze: {topic}",
expected_output="A comprehensive analysis with key insights",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True,
verbose=False
)
# Get user input
topic = input("Enter a topic to research: ")
print(f"\n{'='*60}")
print(f"Researching: {topic}")
print(f"{'='*60}\n")
# Start streaming execution
streaming = await crew.kickoff_async(inputs={"topic": topic})
current_task = ""
async for chunk in streaming:
# Show task transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
print(f"\n[{chunk.agent_role}] Working on: {chunk.task_name}")
print("-" * 60)
# Display text chunks
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
# Display tool calls
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Using tool: {chunk.tool_call.tool_name}")
# Show final result
result = streaming.result
print(f"\n\n{'='*60}")
print("Analysis Complete!")
print(f"{'='*60}")
print(f"\nToken Usage: {result.token_usage}")
asyncio.run(interactive_research())
Use Cases
Streaming is particularly valuable for:
- Interactive Applications: Provide real-time feedback to users as agents work
- Long-Running Tasks: Show progress for research, analysis, or content generation
- Debugging and Monitoring: Observe agent behavior and decision-making in real-time
- User Experience: Reduce perceived latency by showing incremental results
- Live Dashboards: Build monitoring interfaces that display crew execution status
Cancellation and Resource Cleanup
CrewStreamingOutput supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.
Async Context Manager
streaming = await crew.akickoff(inputs={"topic": "AI"})
async with streaming:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
Explicit Cancellation
streaming = await crew.akickoff(inputs={"topic": "AI"})
try:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
finally:
await streaming.aclose() # async
# streaming.close() # sync equivalent
After cancellation, streaming.is_cancelled and streaming.is_completed are both True. Both aclose() and close() are idempotent.
Important Notes
- Streaming automatically enables LLM streaming for all agents in the crew
- You must iterate through all chunks before accessing the
.result property
- For
kickoff_for_each_async() with streaming, use .results (plural) to get all outputs
- Streaming adds minimal overhead and can actually improve perceived performance
- Each chunk includes full context (task, agent, chunk type) for rich UIs
Error Handling
Handle errors during streaming execution:
streaming = crew.kickoff(inputs={"topic": "AI"})
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess: {result.raw}")
except Exception as e:
print(f"\nError during streaming: {e}")
if streaming.is_completed:
print("Streaming completed but an error occurred")
By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.