Build sophisticated multi-agent workflows with advanced orchestration patterns. Learn dependency management, parallel execution, and production-grade pipeline design.
ListenReady
0:00
0:00
5:51
Multi-agent workflows orchestrate specialized agents to handle complex tasks. Instead of a single agent doing everything, each agent focuses on one responsibility while the pipeline manages coordination. This tutorial shows you how to build production-grade workflows using Astreus.
Getting Started
You can clone the complete example or install just the package:
Bash
# Clone the full examplegit clone https://github.com/astreus-ai/complex-workflows
cd complex-workflows
npminstall# Or install the package onlynpminstall @astreus-ai/astreus
The root agent handles coordination. It receives the initial input and can orchestrate the overall workflow.
Adding Task Nodes
Task nodes define individual workflow steps. Use addTaskNode() to specify what each agent should do:
TypeScript
pipeline.addTaskNode({ name:'research', prompt:'Research the topic: {topic}. Find key facts and insights.', agentId: researchAgent.id
});pipeline.addTaskNode({ name:'write', prompt:'Write an article based on this research: {research}', agentId: writerAgent.id, dependencies:['research']});pipeline.addTaskNode({ name:'edit', prompt:'Edit and improve this article: {write}', agentId: editorAgent.id, dependencies:['write']});
The dependencies array defines execution order. The write task waits for research to complete. The edit task waits for writing to finish.
Executing the Workflow
Run the pipeline with initial inputs:
TypeScript
const result =await pipeline.run({ topic:'The future of artificial intelligence'});if(result.success){console.log('Research:', result.results.research);console.log('Article:', result.results.write);console.log('Final:', result.results.edit);}else{console.error('Pipeline failed:', result.error);}
The run() method returns results containing success status and outputs from each task node. Access individual task results by their ID.
Parallel Execution
Tasks without dependencies run in parallel. This reduces total execution time by utilizing all available agents simultaneously.
TypeScript
const analysisAgent =await Agent.create({ name:'Analysis Agent', model:'gpt-4', systemPrompt:'You analyze data and extract insights.'});const sentimentAgent =await Agent.create({ name:'Sentiment Agent', model:'gpt-4', systemPrompt:'You analyze emotional tone and sentiment.'});const summaryAgent =await Agent.create({ name:'Summary Agent', model:'gpt-4', systemPrompt:'You create concise summaries.'});const reportAgent =await Agent.create({ name:'Report Agent', model:'gpt-4', systemPrompt:'You compile information into comprehensive reports.'});const reportPipeline =newGraph({ name:'Analysis Report Pipeline'}, analysisAgent);// These three tasks run in parallelreportPipeline.addTaskNode({ name:'analysis', prompt:'Analyze this data: {data}', agentId: analysisAgent.id
});reportPipeline.addTaskNode({ name:'sentiment', prompt:'Determine sentiment of: {data}', agentId: sentimentAgent.id
});reportPipeline.addTaskNode({ name:'summary', prompt:'Summarize this data: {data}', agentId: summaryAgent.id
});// This task waits for all three to completereportPipeline.addTaskNode({ name:'report', prompt:'Create report combining: {analysis}, {sentiment}, {summary}', agentId: reportAgent.id, dependencies:['analysis','sentiment','summary']});
The analysis, sentiment, and summary tasks run concurrently since they have no dependencies. The report task combines their outputs once all three finish.
Complex Dependency Chains
Real workflows often have intricate dependency relationships. Some tasks depend on multiple predecessors, creating branching and merging patterns.
TypeScript
const dataAgent =await Agent.create({ name:'Data Agent', model:'gpt-4', systemPrompt:'You validate and clean data.'});const processingAgent =await Agent.create({ name:'Processing Agent', model:'gpt-4', systemPrompt:'You process and transform data.'});const validationAgent =await Agent.create({ name:'Validation Agent', model:'gpt-4', systemPrompt:'You validate results against requirements.'});const dataPipeline =newGraph({ name:'Data Processing Pipeline'}, dataAgent);// Initial validationdataPipeline.addTaskNode({ name:'validateInput', prompt:'Validate this input data: {rawData}', agentId: validationAgent.id
});// Parallel processing pathsdataPipeline.addTaskNode({ name:'processTypeA', prompt:'Process type A records: {validateInput}', agentId: processingAgent.id, dependencies:['validateInput']});dataPipeline.addTaskNode({ name:'processTypeB', prompt:'Process type B records: {validateInput}', agentId: processingAgent.id, dependencies:['validateInput']});// Merge resultsdataPipeline.addTaskNode({ name:'merge', prompt:'Merge processed data: {processTypeA}, {processTypeB}', agentId: dataAgent.id, dependencies:['processTypeA','processTypeB']});// Final validationdataPipeline.addTaskNode({ name:'validateOutput', prompt:'Validate merged output: {merge}', agentId: validationAgent.id, dependencies:['merge']});
This pipeline validates input once, processes two data types in parallel, merges the results, then validates the output. The dependency chain ensures correct execution order.
Accessing Task Results
Task outputs reference each other using curly brace syntax. When one task depends on another, it can access the predecessor's output in its prompt.
Match agent capabilities to task requirements. This reduces costs while maintaining quality.
Real-World Example: Support Ticket System
Here's a complete workflow for handling customer support tickets:
TypeScript
import{ Agent, Graph }from'@astreus-ai/astreus';// Create specialized agentsconst classifierAgent =await Agent.create({ name:'Classifier', model:'gpt-3.5-turbo', systemPrompt:'Classify support tickets into categories: technical, billing, or general.'});const technicalAgent =await Agent.create({ name:'Technical Support', model:'gpt-4', systemPrompt:'Provide detailed technical support and troubleshooting.'});const billingAgent =await Agent.create({ name:'Billing Support', model:'gpt-3.5-turbo', systemPrompt:'Handle billing inquiries and account issues.'});const generalAgent =await Agent.create({ name:'General Support', model:'gpt-3.5-turbo', systemPrompt:'Answer general questions and provide information.'});const qualityAgent =await Agent.create({ name:'Quality Reviewer', model:'gpt-4', systemPrompt:'Review support responses for accuracy and helpfulness.'});// Build the support pipelineconst supportPipeline =newGraph({ name:'Support Ticket Pipeline', description:'Classify and respond to customer support tickets'}, classifierAgent);// Step 1: Classify the ticketsupportPipeline.addTaskNode({ name:'classify', prompt:'Classify this support ticket and return JSON: {ticket}', agentId: classifierAgent.id
});// Step 2: Handle based on category (all three run, but only relevant one matters)supportPipeline.addTaskNode({ name:'handleTechnical', prompt:'Provide technical support for: {ticket}', agentId: technicalAgent.id, dependencies:['classify']});supportPipeline.addTaskNode({ name:'handleBilling', prompt:'Handle billing issue: {ticket}', agentId: billingAgent.id, dependencies:['classify']});supportPipeline.addTaskNode({ name:'handleGeneral', prompt:'Answer general inquiry: {ticket}', agentId: generalAgent.id, dependencies:['classify']});// Step 3: Review qualitysupportPipeline.addTaskNode({ name:'review', prompt:'Review these responses: Technical: {handleTechnical}, Billing: {handleBilling}, General: {handleGeneral}. Select the best response based on classification: {classify}', agentId: qualityAgent.id, dependencies:['handleTechnical','handleBilling','handleGeneral']});// Execute the pipelineconst ticket ={ id:'TICKET-123', subject:'Cannot login to my account', message:'I keep getting an error when trying to login. It says invalid credentials but I know my password is correct.', user:{ email:'[email protected]', accountType:'premium'}};const result =await supportPipeline.run({ ticket:JSON.stringify(ticket)});if(result.success){const classification =JSON.parse(result.results.classify);const response =JSON.parse(result.results.review);console.log('Category:', classification.category);console.log('Response:', response.selectedResponse);console.log('Confidence:', response.confidence);}
This workflow classifies tickets, generates responses from multiple specialized agents, and selects the best response through quality review. The pipeline handles the orchestration automatically.
Key Concepts
Successful workflow design follows several principles. First, create specialized agents with clear responsibilities. Each agent should excel at one type of task rather than trying to do everything.
Second, minimize dependencies where possible. Independent tasks run in parallel, reducing total execution time. Only add dependencies when one task truly needs another's output.
Third, design prompts that reference other task outputs explicitly. Use {taskId} syntax to pull results from dependencies. This makes data flow clear and maintainable.
Fourth, handle errors gracefully at the pipeline level. Check result.success before accessing task outputs. Plan for failures and implement appropriate recovery strategies.
Advanced Patterns
Complex workflows can implement sophisticated patterns. Implement map-reduce by running the same agent on multiple inputs in parallel, then combining results:
TypeScript
const documents =['doc1.txt','doc2.txt','doc3.txt','doc4.txt'];const summaryPipeline =newGraph({ name:'Document Summary'}, summaryAgent);// Add a task for each document (runs in parallel)documents.forEach((doc, index)=>{ summaryPipeline.addTaskNode({ name:`summarize_${index}`, prompt:`Summarize this document: {document_${index}}`, agentId: summaryAgent.id
});});// Combine all summariessummaryPipeline.addTaskNode({ name:'combine', prompt:`Combine these summaries into one: ${documents.map((_, i)=>`{summarize_${i}}`).join(', ')}`, agentId: summaryAgent.id, dependencies: documents.map((_, i)=>`summarize_${i}`)});
Implement validation chains where each stage validates different aspects:
Each validation stage only runs if the previous stage passes. This creates fail-fast behavior that saves resources.
Conclusion
Astreus workflows coordinate multiple specialized agents through dependency-based execution. The core pattern is simple: create agents with Agent.create(), initialize a pipeline with new Graph(), add tasks with addTaskNode(), and execute with run().
Parallel execution happens automatically when tasks lack dependencies. Complex workflows emerge from combining simple patterns. Start with linear pipelines, then add parallel branches as needed. Always design for failure by checking result status and handling errors appropriately.
The key to effective workflows is matching agent capabilities to task requirements and minimizing unnecessary dependencies. This maximizes parallelism while maintaining correct execution order.
This experiment is written for Astreus v0.5.37. Please ensure you are using a compatible version.