Core Workflow Architecture Principles
1. Single Responsibility Workflows
Each workflow should have one clear purpose:Copy
Ask AI
// Good: Focused workflow
{
name: 'Process Customer Orders',
nodes: [/* order processing specific nodes */]
}
// Good: Another focused workflow
{
name: 'Send Order Notifications',
nodes: [/* notification specific nodes */]
}
// Bad: Mixed responsibilities
{
name: 'Process Orders And Send Emails And Update Inventory',
nodes: [/* too many mixed concerns */]
}
2. Workflow Composition
Use sub-workflows for complex processes:Copy
Ask AI
// Main orchestrator workflow
const mainWorkflow = {
nodes: [
{
type: 'n8n-nodes-base.executeWorkflow',
parameters: {
workflowId: '{{$json.workflowToExecute}}',
mode: 'each'
}
}
]
};
// Modular sub-workflows
const subWorkflows = [
'data-validation-workflow',
'data-transformation-workflow',
'data-storage-workflow'
];
Essential Workflow Patterns
Pattern 1: Error Recovery Pipeline
Copy
Ask AI
// Workflow with comprehensive error handling
const errorRecoveryWorkflow = {
nodes: [
{
name: 'Try Processing',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
try {
// Main processing logic
const result = await processData($input.all());
return [{json: {success: true, data: result}}];
} catch (error) {
return [{json: {success: false, error: error.message}}];
}
`
}
},
{
name: 'Error Router',
type: 'n8n-nodes-base.switch',
parameters: {
conditions: [
{
condition: {
leftValue: '={{$json.success}}',
rightValue: true
}
}
]
}
},
{
name: 'Retry Handler',
type: 'n8n-nodes-base.wait',
parameters: {
amount: 5,
unit: 'seconds'
}
},
{
name: 'Dead Letter Queue',
type: 'n8n-nodes-base.postgres',
parameters: {
operation: 'insert',
table: 'failed_processes',
columns: 'error_message,data,timestamp'
}
}
],
connections: {
'Try Processing': {
main: [['Error Router']]
},
'Error Router': {
main: [
['Success Handler'],
['Retry Handler']
]
},
'Retry Handler': {
main: [['Try Processing']]
}
}
};
Pattern 2: Batch Processing with Pagination
Copy
Ask AI
// Efficient batch processing workflow
const batchProcessor = {
nodes: [
{
name: 'Initialize',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
// Setup batch parameters
return [{
json: {
batchSize: 100,
offset: 0,
hasMore: true,
processedCount: 0
}
}];
`
}
},
{
name: 'Fetch Batch',
type: 'n8n-nodes-base.httpRequest',
parameters: {
url: 'https://api.example.com/data',
qs: {
limit: '={{$json.batchSize}}',
offset: '={{$json.offset}}'
}
}
},
{
name: 'Process Items',
type: 'n8n-nodes-base.splitInBatches',
parameters: {
batchSize: 10
}
},
{
name: 'Transform Data',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
// Parallel processing of batch items
const promises = $input.all().map(async (item) => {
const processed = await processItem(item.json);
return {json: processed};
});
return Promise.all(promises);
`
}
},
{
name: 'Check More Data',
type: 'n8n-nodes-base.if',
parameters: {
conditions: {
boolean: [{
value1: '={{$json.hasMore}}',
value2: true
}]
}
}
},
{
name: 'Update Offset',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
const current = $input.all()[0].json;
return [{
json: {
...current,
offset: current.offset + current.batchSize
}
}];
`
}
}
]
};
Pattern 3: Event-Driven Architecture
Copy
Ask AI
// Event dispatcher workflow
const eventDispatcher = {
nodes: [
{
name: 'Webhook Trigger',
type: 'n8n-nodes-base.webhook',
parameters: {
path: 'events',
responseMode: 'immediately',
responseData: 'success'
}
},
{
name: 'Validate Event',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
const event = $input.all()[0].json;
// Validate event structure
const requiredFields = ['eventType', 'payload', 'timestamp'];
const isValid = requiredFields.every(field => field in event);
if (!isValid) {
throw new Error('Invalid event structure');
}
return [{json: {
...event,
validated: true
}}];
`
}
},
{
name: 'Event Router',
type: 'n8n-nodes-base.switch',
parameters: {
dataPropertyName: 'eventType',
values: [
{ value: 'user.created' },
{ value: 'order.placed' },
{ value: 'payment.processed' },
{ value: 'inventory.updated' }
]
}
},
{
name: 'User Handler',
type: 'n8n-nodes-base.executeWorkflow',
parameters: {
workflowId: 'user-created-workflow'
}
},
{
name: 'Order Handler',
type: 'n8n-nodes-base.executeWorkflow',
parameters: {
workflowId: 'order-processing-workflow'
}
}
]
};
Pattern 4: State Machine Implementation
Copy
Ask AI
// State machine for order processing
const stateMachine = {
nodes: [
{
name: 'Load State',
type: 'n8n-nodes-base.postgres',
parameters: {
operation: 'select',
table: 'order_states',
where: 'order_id={{$json.orderId}}'
}
},
{
name: 'State Router',
type: 'n8n-nodes-base.switch',
parameters: {
dataPropertyName: 'currentState',
values: [
{ value: 'pending' },
{ value: 'processing' },
{ value: 'shipped' },
{ value: 'delivered' },
{ value: 'cancelled' }
]
}
},
{
name: 'Process Pending',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
const order = $input.all()[0].json;
// Validate payment
const paymentValid = await validatePayment(order.paymentId);
if (paymentValid) {
return [{json: {
...order,
currentState: 'processing',
nextAction: 'prepare_shipment'
}}];
}
return [{json: {
...order,
currentState: 'cancelled',
reason: 'payment_failed'
}}];
`
}
},
{
name: 'Update State',
type: 'n8n-nodes-base.postgres',
parameters: {
operation: 'update',
table: 'order_states',
updateColumns: 'currentState,updatedAt,metadata'
}
}
]
};
Advanced Workflow Techniques
Dynamic Workflow Generation
Copy
Ask AI
// Generate workflows programmatically
function createDynamicWorkflow(config: WorkflowConfig) {
const workflow = {
name: config.name,
nodes: [],
connections: {}
};
// Add trigger node
workflow.nodes.push({
name: 'Trigger',
type: config.triggerType,
position: [250, 300],
parameters: config.triggerParams
});
// Add processing nodes dynamically
config.steps.forEach((step, index) => {
const node = {
name: step.name,
type: step.nodeType,
position: [250 + (index + 1) * 200, 300],
parameters: step.parameters
};
workflow.nodes.push(node);
// Connect to previous node
const prevNode = index === 0 ? 'Trigger' : config.steps[index - 1].name;
workflow.connections[prevNode] = {
main: [[node.name]]
};
});
return workflow;
}
// Usage
const dynamicWorkflow = createDynamicWorkflow({
name: 'Generated Workflow',
triggerType: 'n8n-nodes-base.cron',
triggerParams: { cronExpression: '0 */6 * * *' },
steps: [
{
name: 'Fetch Data',
nodeType: 'n8n-nodes-base.httpRequest',
parameters: { url: 'https://api.example.com/data' }
},
{
name: 'Process',
nodeType: 'n8n-nodes-base.function',
parameters: { functionCode: 'return items;' }
}
]
});
Workflow Orchestration
Copy
Ask AI
// Master orchestrator for complex processes
const orchestrator = {
name: 'Master Orchestrator',
nodes: [
{
name: 'Schedule Trigger',
type: 'n8n-nodes-base.cron',
parameters: {
cronExpression: '0 0 * * *'
}
},
{
name: 'Load Job Queue',
type: 'n8n-nodes-base.postgres',
parameters: {
operation: 'select',
query: `
SELECT * FROM job_queue
WHERE status = 'pending'
AND scheduled_time <= NOW()
ORDER BY priority DESC, created_at ASC
LIMIT 100
`
}
},
{
name: 'Job Dispatcher',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
const jobs = $input.all();
const results = [];
for (const job of jobs) {
const workflowId = getWorkflowForJobType(job.json.type);
results.push({
json: {
jobId: job.json.id,
workflowId: workflowId,
payload: job.json.payload,
priority: job.json.priority
}
});
}
return results;
`
}
},
{
name: 'Execute Jobs',
type: 'n8n-nodes-base.executeWorkflow',
parameters: {
workflowId: '={{$json.workflowId}}',
mode: 'queue'
}
},
{
name: 'Update Job Status',
type: 'n8n-nodes-base.postgres',
parameters: {
operation: 'update',
table: 'job_queue',
updateColumns: 'status,completed_at,result'
}
}
]
};
Performance Optimization Patterns
Copy
Ask AI
// Parallel processing pattern
const parallelProcessor = {
nodes: [
{
name: 'Split Data',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
const items = $input.all();
const chunkSize = 50;
const chunks = [];
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push({
json: {
chunk: items.slice(i, i + chunkSize),
chunkIndex: Math.floor(i / chunkSize)
}
});
}
return chunks;
`
}
},
{
name: 'Process Parallel',
type: 'n8n-nodes-base.executeWorkflow',
parameters: {
workflowId: 'chunk-processor',
mode: 'parallel',
maxParallel: 5
}
},
{
name: 'Merge Results',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
const chunks = $input.all();
const merged = chunks.flatMap(chunk => chunk.json.results);
return [{
json: {
totalProcessed: merged.length,
results: merged
}
}];
`
}
}
]
};
Testing Workflows
Unit Testing with Jest
Copy
Ask AI
// workflow.test.ts
import { WorkflowExecute } from 'n8n-core';
import { createMockExecuteFunctions } from './test-utils';
describe('Order Processing Workflow', () => {
let workflow: WorkflowExecute;
beforeEach(() => {
workflow = new WorkflowExecute();
});
test('should process valid order', async () => {
const input = {
orderId: '123',
amount: 100,
customerId: 'cust_456'
};
const result = await workflow.run({
nodes: orderWorkflowNodes,
connections: orderWorkflowConnections,
active: true,
nodeTypes: mockNodeTypes,
staticData: {},
settings: {}
});
expect(result.data.main[0][0].json.status).toBe('processed');
});
test('should handle payment failure', async () => {
const input = {
orderId: '124',
amount: -1, // Invalid amount
customerId: 'cust_456'
};
const result = await workflow.run({
// ... workflow config
});
expect(result.data.main[0][0].json.status).toBe('failed');
expect(result.data.main[0][0].json.error).toContain('payment');
});
});
Integration Testing
Copy
Ask AI
# Test workflow via API
curl -X POST http://localhost:5678/webhook-test/workflow-id \
-H "Content-Type: application/json" \
-d '{"test": "data"}'
# Monitor execution
curl http://localhost:5678/api/v1/executions?workflowId=1
Monitoring and Observability
Custom Logging Node
Copy
Ask AI
// LoggingNode.node.ts
export class LoggingNode implements INodeType {
description: INodeTypeDescription = {
displayName: 'Custom Logger',
name: 'customLogger',
group: ['utility'],
version: 1,
description: 'Log workflow execution details',
inputs: ['main'],
outputs: ['main'],
properties: [
{
displayName: 'Log Level',
name: 'logLevel',
type: 'options',
options: [
{ name: 'Debug', value: 'debug' },
{ name: 'Info', value: 'info' },
{ name: 'Warning', value: 'warning' },
{ name: 'Error', value: 'error' }
],
default: 'info'
}
]
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const logLevel = this.getNodeParameter('logLevel', 0) as string;
const logEntry = {
timestamp: new Date().toISOString(),
workflowId: this.getWorkflow().id,
executionId: this.getExecutionId(),
nodeName: this.getNode().name,
level: logLevel,
itemCount: items.length,
data: items.map(item => item.json)
};
// Send to logging service
await this.helpers.request({
method: 'POST',
uri: 'http://logging-service/logs',
body: logEntry,
json: true
});
return [items];
}
}
Metrics Collection
Copy
Ask AI
// Collect workflow metrics
const metricsCollector = {
name: 'Metrics Collector',
nodes: [
{
name: 'Collect Metrics',
type: 'n8n-nodes-base.function',
parameters: {
functionCode: `
const startTime = Date.now();
const metrics = {
workflowId: $workflow.id,
executionId: $execution.id,
startTime: new Date(startTime).toISOString(),
nodeCount: Object.keys($workflow.nodes).length,
itemCount: $input.all().length
};
// Store in context for later
$setWorkflowStaticData('metrics', metrics);
return $input.all();
`
}
},
{
name: 'Send Metrics',
type: 'n8n-nodes-base.httpRequest',
parameters: {
url: 'http://metrics-service/collect',
method: 'POST',
body: '={{$getWorkflowStaticData("metrics")}}'
}
}
]
};
Deployment Strategies
Blue-Green Deployment
Copy
Ask AI
# docker-compose.blue-green.yml
version: '3.8'
services:
n8n-blue:
image: n8nio/n8n:latest
environment:
- VERSION=blue
- PORT=5678
labels:
- "traefik.http.routers.n8n-blue.rule=Host(`n8n.example.com`) && Headers(`X-Version`, `blue`)"
n8n-green:
image: n8nio/n8n:next
environment:
- VERSION=green
- PORT=5679
labels:
- "traefik.http.routers.n8n-green.rule=Host(`n8n.example.com`) && Headers(`X-Version`, `green`)"
traefik:
image: traefik:v2.9
command:
- "--api.insecure=true"
- "--providers.docker=true"
ports:
- "80:80"
- "8080:8080"
Best Practices
Workflow Design
Workflow Design
- Keep workflows focused and single-purpose
- Use sub-workflows for complex logic
- Implement proper error handling
- Add logging and monitoring nodes
- Document workflow purpose and dependencies
Performance
Performance
- Process data in batches
- Use parallel execution where possible
- Implement caching strategies
- Optimize database queries
- Monitor resource usage
Security
Security
- Never hardcode credentials
- Validate all input data
- Implement rate limiting
- Use secure connections (HTTPS/TLS)
- Audit workflow access
Maintenance
Maintenance
- Version control workflows
- Implement automated testing
- Use meaningful node names
- Add comments in function nodes
- Regular backup strategy