Documentation Index
Fetch the complete documentation index at: https://crewai-devin-1778040886-fix-hitl-pre-review-silent-fallback.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Introdução
O CrewAI fornece a capacidade de transmitir saída em tempo real durante a execução da crew, permitindo que você exiba resultados conforme são gerados, em vez de esperar que todo o processo seja concluído. Este recurso é particularmente útil para construir aplicações interativas, fornecer feedback ao usuário e monitorar processos de longa duração.
Como o Streaming Funciona
Quando o streaming está ativado, o CrewAI captura respostas do LLM e chamadas de ferramentas conforme acontecem, empacotando-as em chunks estruturados que incluem contexto sobre qual task e agent está executando. Você pode iterar sobre esses chunks em tempo real e acessar o resultado final quando a execução for concluída.
Ativando o Streaming
Para ativar o streaming, defina o parâmetro stream como True ao criar sua crew:
from crewai import Agent, Crew, Task
# Crie seus agentes e tasks
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information on topics",
backstory="You are an experienced researcher with excellent analytical skills.",
)
task = Task(
description="Research the latest developments in AI",
expected_output="A detailed report on recent AI advancements",
agent=researcher,
)
# Ativar streaming
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True # Ativar saída em streaming
)
Streaming Síncrono
Quando você chama kickoff() em uma crew com streaming ativado, ele retorna um objeto CrewStreamingOutput que você pode iterar para receber chunks conforme chegam:
# Iniciar execução com streaming
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})
# Iterar sobre chunks conforme chegam
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Acessar o resultado final após o streaming completar
result = streaming.result
print(f"\n\nSaída final: {result.raw}")
Cada chunk fornece contexto rico sobre a execução:
streaming = crew.kickoff(inputs={"topic": "AI"})
for chunk in streaming:
print(f"Task: {chunk.task_name} (índice {chunk.task_index})")
print(f"Agent: {chunk.agent_role}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT ou TOOL_CALL
if chunk.tool_call:
print(f"Tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
Acessando Resultados do Streaming
O objeto CrewStreamingOutput fornece várias propriedades úteis:
streaming = crew.kickoff(inputs={"topic": "AI"})
# Iterar e coletar chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Após a iteração completar
print(f"\nCompletado: {streaming.is_completed}")
print(f"Texto completo: {streaming.get_full_text()}")
print(f"Todos os chunks: {len(streaming.chunks)}")
print(f"Resultado final: {streaming.result.raw}")
Streaming Assíncrono
Para aplicações assíncronas, você pode usar akickoff() (async nativo) ou kickoff_async() (baseado em threads) com iteração assíncrona:
O método akickoff() fornece execução async nativa verdadeira em toda a cadeia:
import asyncio
async def stream_crew():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Iniciar streaming async nativo
streaming = await crew.akickoff(inputs={"topic": "AI"})
# Iteração assíncrona sobre chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Acessar resultado final
result = streaming.result
print(f"\n\nSaída final: {result.raw}")
asyncio.run(stream_crew())
Para integração async mais simples ou compatibilidade retroativa:
import asyncio
async def stream_crew():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Iniciar streaming async baseado em threads
streaming = await crew.kickoff_async(inputs={"topic": "AI"})
# Iteração assíncrona sobre chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Acessar resultado final
result = streaming.result
print(f"\n\nSaída final: {result.raw}")
asyncio.run(stream_crew())
Para cargas de trabalho de alta concorrência, akickoff() é recomendado pois usa async nativo para execução de tasks, operações de memória e recuperação de conhecimento. Consulte o guia Iniciar Crew de Forma Assíncrona para mais detalhes.
Ao executar uma crew para múltiplas entradas com kickoff_for_each(), o streaming funciona de forma diferente dependendo se você usa síncrono ou assíncrono:
kickoff_for_each Síncrono
Com kickoff_for_each() síncrono, você obtém uma lista de objetos CrewStreamingOutput, um para cada entrada:
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Retorna lista de saídas de streaming
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)
# Iterar sobre cada saída de streaming
for i, streaming in enumerate(streaming_outputs):
print(f"\n=== Entrada {i + 1} ===")
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nResultado {i + 1}: {result.raw}")
kickoff_for_each_async Assíncrono
Com kickoff_for_each_async() assíncrono, você obtém um único CrewStreamingOutput que produz chunks de todas as crews conforme chegam concorrentemente:
import asyncio
async def stream_multiple_crews():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Retorna saída de streaming única para todas as crews
streaming = await crew.kickoff_for_each_async(inputs=inputs_list)
# Chunks de todas as crews chegam conforme são gerados
async for chunk in streaming:
print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)
# Acessar todos os resultados
results = streaming.results # Lista de objetos CrewOutput
for i, result in enumerate(results):
print(f"\n\nResultado {i + 1}: {result.raw}")
asyncio.run(stream_multiple_crews())
Tipos de Chunk de Stream
Chunks podem ser de diferentes tipos, indicados pelo campo chunk_type:
Chunks TEXT
Conteúdo de texto padrão de respostas do LLM:
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
Informações sobre chamadas de ferramentas sendo feitas:
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL:
print(f"\nChamando ferramenta: {chunk.tool_call.tool_name}")
print(f"Argumentos: {chunk.tool_call.arguments}")
Aqui está um exemplo completo mostrando como construir uma aplicação interativa com streaming:
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
async def interactive_research():
# Criar crew com streaming ativado
researcher = Agent(
role="Research Analyst",
goal="Provide detailed analysis on any topic",
backstory="You are an expert researcher with broad knowledge.",
)
task = Task(
description="Research and analyze: {topic}",
expected_output="A comprehensive analysis with key insights",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True,
verbose=False
)
# Obter entrada do usuário
topic = input("Digite um tópico para pesquisar: ")
print(f"\n{'='*60}")
print(f"Pesquisando: {topic}")
print(f"{'='*60}\n")
# Iniciar execução com streaming
streaming = await crew.kickoff_async(inputs={"topic": topic})
current_task = ""
async for chunk in streaming:
# Mostrar transições de task
if chunk.task_name != current_task:
current_task = chunk.task_name
print(f"\n[{chunk.agent_role}] Trabalhando em: {chunk.task_name}")
print("-" * 60)
# Exibir chunks de texto
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
# Exibir chamadas de ferramentas
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Usando ferramenta: {chunk.tool_call.tool_name}")
# Mostrar resultado final
result = streaming.result
print(f"\n\n{'='*60}")
print("Análise Completa!")
print(f"{'='*60}")
print(f"\nUso de Tokens: {result.token_usage}")
asyncio.run(interactive_research())
Casos de Uso
O streaming é particularmente valioso para:
- Aplicações Interativas: Fornecer feedback em tempo real aos usuários enquanto os agentes trabalham
- Tasks de Longa Duração: Mostrar progresso para pesquisa, análise ou geração de conteúdo
- Depuração e Monitoramento: Observar comportamento e tomada de decisão dos agentes em tempo real
- Experiência do Usuário: Reduzir latência percebida mostrando resultados incrementais
- Dashboards ao Vivo: Construir interfaces de monitoramento que exibem status de execução da crew
Cancelamento e Limpeza de Recursos
CrewStreamingOutput suporta cancelamento gracioso para que o trabalho em andamento pare imediatamente quando o consumidor desconecta.
Gerenciador de Contexto Assíncrono
streaming = await crew.akickoff(inputs={"topic": "AI"})
async with streaming:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
Cancelamento Explícito
streaming = await crew.akickoff(inputs={"topic": "AI"})
try:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
finally:
await streaming.aclose() # assíncrono
# streaming.close() # equivalente síncrono
Após o cancelamento, streaming.is_cancelled e streaming.is_completed são ambos True. Tanto aclose() quanto close() são idempotentes.
Notas Importantes
- O streaming ativa automaticamente o streaming do LLM para todos os agentes na crew
- Você deve iterar através de todos os chunks antes de acessar a propriedade
.result
- Para
kickoff_for_each_async() com streaming, use .results (plural) para obter todas as saídas
- O streaming adiciona overhead mínimo e pode realmente melhorar a performance percebida
- Cada chunk inclui contexto completo (task, agente, tipo de chunk) para UIs ricas
Tratamento de Erros
Trate erros durante a execução com streaming:
streaming = crew.kickoff(inputs={"topic": "AI"})
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nSucesso: {result.raw}")
except Exception as e:
print(f"\nErro durante o streaming: {e}")
if streaming.is_completed:
print("O streaming foi completado mas ocorreu um erro")
Ao aproveitar o streaming, você pode construir aplicações mais responsivas e interativas com o CrewAI, fornecendo aos usuários visibilidade em tempo real da execução dos agentes e resultados.