Why Observability Matters in n8n
In production, you can’t attach a debugger to see what went wrong. Observability is your window into the system - it tells you not just that something failed, but why, when, and how to fix it.
- Logs: What happened and when
- Metrics: How the system is performing
- Traces: How requests flow through the system
Structured Logging
Building a Production Logging System
Copy
import { IExecuteFunctions, ILogger } from 'n8n-workflow';
import * as winston from 'winston';
export class ObservableNode {
private logger: EnhancedLogger;
constructor() {
this.logger = new EnhancedLogger({
service: 'n8n-custom-node',
environment: process.env.NODE_ENV || 'development',
});
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Create correlation ID for request tracing
const correlationId = this.generateCorrelationId();
const context = this.buildExecutionContext(correlationId);
// Log execution start
this.logger.info('Node execution started', context);
try {
// Execute with detailed logging
const result = await this.executeWithLogging(context);
this.logger.info('Node execution completed', {
...context,
itemsProcessed: result.length,
executionTime: Date.now() - context.startTime,
});
return [result];
} catch (error) {
this.logger.error('Node execution failed', {
...context,
error: this.serializeError(error),
stack: error.stack,
});
throw error;
}
}
private buildExecutionContext(correlationId: string): ExecutionContext {
return {
correlationId,
workflowId: this.getWorkflow().id,
workflowName: this.getWorkflow().name,
nodeType: this.getNode().type,
nodeName: this.getNode().name,
executionId: this.getExecutionId(),
startTime: Date.now(),
userId: this.getWorkflowStaticData('userId'),
environment: {
nodeVersion: process.version,
platform: process.platform,
memoryUsage: process.memoryUsage(),
},
};
}
private async executeWithLogging(
context: ExecutionContext
): Promise<INodeExecutionData[]> {
const items = this.getInputData();
const results: INodeExecutionData[] = [];
for (let i = 0; i < items.length; i++) {
const itemContext = { ...context, itemIndex: i };
this.logger.debug('Processing item', itemContext);
try {
const result = await this.processItem(items[i], itemContext);
results.push(result);
this.logger.debug('Item processed successfully', {
...itemContext,
resultSize: JSON.stringify(result).length,
});
} catch (error) {
this.logger.error('Item processing failed', {
...itemContext,
error: this.serializeError(error),
input: this.sanitizeForLogging(items[i]),
});
// Decide whether to continue or fail
if (this.continueOnFail()) {
results.push({
json: { error: error.message },
error: error,
});
} else {
throw error;
}
}
}
return results;
}
private serializeError(error: any): any {
return {
name: error.name,
message: error.message,
code: error.code,
statusCode: error.statusCode,
details: error.details,
};
}
private sanitizeForLogging(data: any): any {
// Remove sensitive data before logging
const sanitized = JSON.parse(JSON.stringify(data));
const sensitiveFields = ['password', 'apiKey', 'token', 'secret'];
const sanitizeObject = (obj: any) => {
for (const key in obj) {
if (sensitiveFields.some(field => key.toLowerCase().includes(field))) {
obj[key] = '[REDACTED]';
} else if (typeof obj[key] === 'object' && obj[key] !== null) {
sanitizeObject(obj[key]);
}
}
};
sanitizeObject(sanitized);
return sanitized;
}
}
class EnhancedLogger {
private winston: winston.Logger;
constructor(private config: LoggerConfig) {
this.winston = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: {
service: config.service,
environment: config.environment,
},
transports: this.getTransports(),
});
}
private getTransports(): winston.transport[] {
const transports: winston.transport[] = [];
// Console transport for development
if (process.env.NODE_ENV !== 'production') {
transports.push(
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
),
})
);
}
// File transport for production
if (process.env.NODE_ENV === 'production') {
transports.push(
new winston.transports.File({
filename: '/var/log/n8n/error.log',
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5,
}),
new winston.transports.File({
filename: '/var/log/n8n/combined.log',
maxsize: 5242880, // 5MB
maxFiles: 5,
})
);
}
// CloudWatch, DataDog, etc.
if (process.env.CLOUDWATCH_ENABLED) {
const CloudWatchTransport = require('winston-cloudwatch');
transports.push(
new CloudWatchTransport({
logGroupName: '/aws/n8n',
logStreamName: `${config.service}-${Date.now()}`,
region: process.env.AWS_REGION,
})
);
}
return transports;
}
info(message: string, meta?: any): void {
this.winston.info(message, meta);
}
error(message: string, meta?: any): void {
this.winston.error(message, meta);
}
warn(message: string, meta?: any): void {
this.winston.warn(message, meta);
}
debug(message: string, meta?: any): void {
this.winston.debug(message, meta);
}
}
Log Aggregation Patterns
Copy
export class LogAggregatorNode {
private logBuffer: LogEntry[] = [];
private flushInterval: NodeJS.Timeout;
constructor() {
// Batch logs for efficient shipping
this.flushInterval = setInterval(() => {
this.flushLogs();
}, 5000); // Flush every 5 seconds
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const operation = this.getNodeParameter('operation', 0) as string;
// Structured logging with context
this.log({
level: 'info',
message: 'Operation started',
operation,
timestamp: new Date().toISOString(),
context: this.getExecutionContext(),
});
try {
const result = await this.performOperation(operation);
this.log({
level: 'info',
message: 'Operation completed',
operation,
duration: this.calculateDuration(),
resultCount: result.length,
});
return [result];
} catch (error) {
this.log({
level: 'error',
message: 'Operation failed',
operation,
error: error.message,
stack: error.stack,
});
throw error;
}
}
private log(entry: LogEntry): void {
// Add to buffer
this.logBuffer.push(entry);
// Immediate flush for errors
if (entry.level === 'error') {
this.flushLogs();
}
// Flush if buffer is getting large
if (this.logBuffer.length >= 100) {
this.flushLogs();
}
}
private async flushLogs(): Promise<void> {
if (this.logBuffer.length === 0) return;
const logs = [...this.logBuffer];
this.logBuffer = [];
try {
// Send to log aggregation service
await this.sendToLogService(logs);
} catch (error) {
console.error('Failed to flush logs:', error);
// Re-add logs to buffer for retry
this.logBuffer.unshift(...logs);
}
}
private async sendToLogService(logs: LogEntry[]): Promise<void> {
// Send to ELK Stack, Datadog, Splunk, etc.
const endpoint = process.env.LOG_AGGREGATOR_ENDPOINT;
if (!endpoint) {
// Fallback to console
logs.forEach(log => console.log(JSON.stringify(log)));
return;
}
await fetch(endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': process.env.LOG_AGGREGATOR_API_KEY!,
},
body: JSON.stringify({
logs,
source: 'n8n-custom-node',
hostname: require('os').hostname(),
}),
});
}
destroy(): void {
clearInterval(this.flushInterval);
this.flushLogs(); // Final flush
}
}
Metrics Collection
Custom Metrics Implementation
Metrics are not logs. Logs tell you what happened, metrics tell you how much and how fast. Use both for complete observability.
Copy
import { StatsD } from 'node-statsd';
import * as promClient from 'prom-client';
export class MetricsNode {
private statsd: StatsD;
private promRegistry: promClient.Registry;
private metrics: NodeMetrics;
constructor() {
// Initialize StatsD client
this.statsd = new StatsD({
host: process.env.STATSD_HOST || 'localhost',
port: parseInt(process.env.STATSD_PORT || '8125'),
prefix: 'n8n.custom_node.',
});
// Initialize Prometheus registry
this.promRegistry = new promClient.Registry();
// Define metrics
this.metrics = {
executionCounter: new promClient.Counter({
name: 'node_executions_total',
help: 'Total number of node executions',
labelNames: ['status', 'node_type'],
registers: [this.promRegistry],
}),
executionDuration: new promClient.Histogram({
name: 'node_execution_duration_seconds',
help: 'Duration of node executions',
labelNames: ['node_type'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
registers: [this.promRegistry],
}),
itemsProcessed: new promClient.Counter({
name: 'items_processed_total',
help: 'Total number of items processed',
labelNames: ['node_type', 'result'],
registers: [this.promRegistry],
}),
memoryUsage: new promClient.Gauge({
name: 'node_memory_usage_bytes',
help: 'Memory usage of the node',
labelNames: ['type'],
registers: [this.promRegistry],
}),
activeExecutions: new promClient.Gauge({
name: 'active_executions',
help: 'Number of currently active executions',
registers: [this.promRegistry],
}),
};
// Start metrics collection
this.startMetricsCollection();
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const timer = this.metrics.executionDuration.startTimer({
node_type: this.getNode().type,
});
// Track active execution
this.metrics.activeExecutions.inc();
try {
// Track memory before execution
const memBefore = process.memoryUsage();
const result = await this.executeWithMetrics();
// Track memory after execution
const memAfter = process.memoryUsage();
const memDelta = memAfter.heapUsed - memBefore.heapUsed;
// Record metrics
this.metrics.executionCounter.inc({
status: 'success',
node_type: this.getNode().type,
});
this.metrics.itemsProcessed.inc(
{
node_type: this.getNode().type,
result: 'success',
},
result.length
);
this.metrics.memoryUsage.set(
{ type: 'heap_used' },
memAfter.heapUsed
);
// Send to StatsD
this.statsd.increment('execution.success');
this.statsd.timing('execution.duration', timer());
this.statsd.gauge('memory.delta', memDelta);
this.statsd.histogram('items.count', result.length);
return [result];
} catch (error) {
this.metrics.executionCounter.inc({
status: 'error',
node_type: this.getNode().type,
});
this.statsd.increment('execution.error');
this.statsd.increment(`error.${error.code || 'unknown'}`);
throw error;
} finally {
timer();
this.metrics.activeExecutions.dec();
}
}
private startMetricsCollection(): void {
// Collect default metrics (CPU, memory, etc.)
promClient.collectDefaultMetrics({
register: this.promRegistry,
prefix: 'n8n_node_',
});
// Export metrics endpoint for Prometheus scraping
if (process.env.METRICS_PORT) {
const express = require('express');
const app = express();
app.get('/metrics', async (req, res) => {
res.set('Content-Type', this.promRegistry.contentType);
const metrics = await this.promRegistry.metrics();
res.end(metrics);
});
app.listen(process.env.METRICS_PORT, () => {
console.log(`Metrics available at :${process.env.METRICS_PORT}/metrics`);
});
}
}
private async executeWithMetrics(): Promise<INodeExecutionData[]> {
const items = this.getInputData();
const results: INodeExecutionData[] = [];
// Track processing rate
const startTime = Date.now();
let processedCount = 0;
for (const item of items) {
const itemTimer = this.metrics.executionDuration.startTimer({
node_type: `${this.getNode().type}_item`,
});
try {
const result = await this.processItem(item);
results.push(result);
processedCount++;
// Report processing rate periodically
if (processedCount % 100 === 0) {
const rate = processedCount / ((Date.now() - startTime) / 1000);
this.statsd.gauge('processing.rate', rate);
}
} catch (error) {
this.metrics.itemsProcessed.inc(
{
node_type: this.getNode().type,
result: 'error',
},
1
);
} finally {
itemTimer();
}
}
return results;
}
}
interface NodeMetrics {
executionCounter: promClient.Counter<string>;
executionDuration: promClient.Histogram<string>;
itemsProcessed: promClient.Counter<string>;
memoryUsage: promClient.Gauge<string>;
activeExecutions: promClient.Gauge<string>;
}
Distributed Tracing
OpenTelemetry Integration
Copy
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import * as api from '@opentelemetry/api';
export class TracedNode {
private tracer: api.Tracer;
constructor() {
// Initialize OpenTelemetry
const sdk = new NodeSDK({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'n8n-custom-node',
[SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
}),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();
this.tracer = api.trace.getTracer('n8n-custom-node', '1.0.0');
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Start root span
return await this.tracer.startActiveSpan(
'node.execute',
async (span) => {
try {
// Add span attributes
span.setAttributes({
'node.type': this.getNode().type,
'node.name': this.getNode().name,
'workflow.id': this.getWorkflow().id,
'execution.id': this.getExecutionId(),
});
// Extract parent context if available
const parentContext = this.extractParentContext();
if (parentContext) {
span.addLink({ context: parentContext });
}
const result = await this.executeWithTracing(span);
span.setStatus({ code: api.SpanStatusCode.OK });
return [result];
} catch (error) {
span.recordException(error);
span.setStatus({
code: api.SpanStatusCode.ERROR,
message: error.message,
});
throw error;
} finally {
span.end();
}
}
);
}
private async executeWithTracing(
parentSpan: api.Span
): Promise<INodeExecutionData[]> {
const items = this.getInputData();
const results: INodeExecutionData[] = [];
for (let i = 0; i < items.length; i++) {
await this.tracer.startActiveSpan(
'item.process',
{ attributes: { 'item.index': i } },
async (span) => {
try {
// Add baggage for cross-service propagation
const baggage = api.propagation.getBaggage(api.context.active());
baggage?.setEntry('item.id', { value: String(i) });
const result = await this.processItemWithTracing(items[i], span);
results.push(result);
span.setStatus({ code: api.SpanStatusCode.OK });
} catch (error) {
span.recordException(error);
span.setStatus({
code: api.SpanStatusCode.ERROR,
message: error.message,
});
} finally {
span.end();
}
}
);
}
return results;
}
private async processItemWithTracing(
item: INodeExecutionData,
parentSpan: api.Span
): Promise<INodeExecutionData> {
// Create child spans for sub-operations
const validationSpan = this.tracer.startSpan('validate', {
parent: parentSpan,
});
try {
await this.validateItem(item);
} finally {
validationSpan.end();
}
const transformSpan = this.tracer.startSpan('transform', {
parent: parentSpan,
});
try {
const transformed = await this.transformItem(item);
return transformed;
} finally {
transformSpan.end();
}
}
private extractParentContext(): api.Context | null {
// Extract trace context from headers or metadata
const traceHeader = this.getIncomingHeader('traceparent');
if (traceHeader) {
const context = api.propagation.extract(
api.context.active(),
{ traceparent: traceHeader }
);
return context;
}
return null;
}
}
Advanced Debugging Techniques
Remote Debugging Setup
Copy
export class DebuggableNode {
private debugger: RemoteDebugger;
constructor() {
if (process.env.ENABLE_REMOTE_DEBUG) {
this.debugger = new RemoteDebugger({
port: parseInt(process.env.DEBUG_PORT || '9229'),
breakOnStart: process.env.BREAK_ON_START === 'true',
});
}
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Conditional breakpoint
if (this.shouldBreak()) {
debugger; // Will pause here if inspector is attached
}
// Debug context injection
const debugContext = this.injectDebugContext();
try {
const result = await this.executeWithDebugging(debugContext);
return [result];
} catch (error) {
// Capture debug snapshot on error
await this.captureDebugSnapshot(error, debugContext);
throw error;
}
}
private shouldBreak(): boolean {
// Break on specific conditions
const breakConditions = process.env.DEBUG_CONDITIONS?.split(',') || [];
return breakConditions.some(condition => {
switch (condition) {
case 'large_payload':
return JSON.stringify(this.getInputData()).length > 10000;
case 'specific_node':
return this.getNode().name === process.env.DEBUG_NODE_NAME;
case 'error_rate_high':
return this.getErrorRate() > 0.1;
default:
return false;
}
});
}
private injectDebugContext(): DebugContext {
return {
sessionId: this.generateSessionId(),
timestamp: Date.now(),
inputs: this.captureInputs(),
environment: this.captureEnvironment(),
stack: new Error().stack,
};
}
private async captureDebugSnapshot(
error: any,
context: DebugContext
): Promise<void> {
const snapshot = {
error: {
message: error.message,
stack: error.stack,
code: error.code,
},
context,
state: {
memory: process.memoryUsage(),
cpu: process.cpuUsage(),
uptime: process.uptime(),
},
variables: this.captureLocalVariables(),
timeline: this.getExecutionTimeline(),
};
// Save snapshot for analysis
await this.saveDebugSnapshot(snapshot);
// Send to debug service if configured
if (process.env.DEBUG_SERVICE_URL) {
await this.sendToDebugService(snapshot);
}
}
private captureLocalVariables(): any {
// Use V8 inspector to capture local scope
const inspector = require('inspector');
const session = new inspector.Session();
session.connect();
const variables: any = {};
session.post('Runtime.evaluate', {
expression: 'Object.keys(this)',
includeCommandLineAPI: true,
}, (err, result) => {
if (!err && result.result) {
variables.scope = result.result;
}
});
session.disconnect();
return variables;
}
private async saveDebugSnapshot(snapshot: any): Promise<void> {
const fs = require('fs').promises;
const path = require('path');
const debugDir = process.env.DEBUG_OUTPUT_DIR || '/tmp/n8n-debug';
await fs.mkdir(debugDir, { recursive: true });
const filename = `debug-${snapshot.context.sessionId}-${Date.now()}.json`;
const filepath = path.join(debugDir, filename);
await fs.writeFile(filepath, JSON.stringify(snapshot, null, 2));
console.log(`Debug snapshot saved to: ${filepath}`);
}
}
class RemoteDebugger {
constructor(private config: DebugConfig) {
if (config.breakOnStart) {
// Start inspector and wait for debugger
const inspector = require('inspector');
inspector.open(config.port, '0.0.0.0', true);
console.log(`Debugger listening on port ${config.port}`);
debugger; // Pause execution
}
}
}
Performance Profiling
Copy
export class ProfiledNode {
private profiler: Profiler;
constructor() {
this.profiler = new Profiler({
enabled: process.env.PROFILING_ENABLED === 'true',
sampleRate: parseInt(process.env.PROFILING_SAMPLE_RATE || '100'),
});
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Start CPU profiling
const cpuProfile = await this.profiler.startCPUProfile('node-execution');
// Start heap profiling
const heapSnapshot = this.profiler.takeHeapSnapshot('before');
try {
const result = await this.executeWithProfiling();
// Stop profiling and analyze
const profile = await this.profiler.stopCPUProfile(cpuProfile);
await this.analyzeProfile(profile);
return [result];
} finally {
// Take final heap snapshot
const finalHeap = this.profiler.takeHeapSnapshot('after');
await this.compareHeapSnapshots(heapSnapshot, finalHeap);
}
}
private async analyzeProfile(profile: CPUProfile): Promise<void> {
// Find hot spots
const hotFunctions = profile.nodes
.sort((a, b) => b.hitCount - a.hitCount)
.slice(0, 10);
this.logger.info('CPU Profile Hot Spots:', {
totalTime: profile.duration,
hotFunctions: hotFunctions.map(f => ({
name: f.functionName,
hits: f.hitCount,
percentage: (f.hitCount / profile.samples.length) * 100,
})),
});
// Check for performance issues
const issues = this.detectPerformanceIssues(profile);
if (issues.length > 0) {
this.logger.warn('Performance issues detected:', issues);
}
}
private detectPerformanceIssues(profile: CPUProfile): string[] {
const issues: string[] = [];
// Check for excessive GC
const gcTime = profile.nodes
.filter(n => n.functionName.includes('GC'))
.reduce((sum, n) => sum + n.hitCount, 0);
if (gcTime / profile.samples.length > 0.1) {
issues.push('Excessive garbage collection (>10% of CPU time)');
}
// Check for blocking operations
const blockingOps = profile.nodes.filter(n =>
['readFileSync', 'writeFileSync', 'execSync'].some(op =>
n.functionName.includes(op)
)
);
if (blockingOps.length > 0) {
issues.push('Blocking I/O operations detected');
}
return issues;
}
}
class Profiler {
private v8Profiler: any;
constructor(private config: ProfilerConfig) {
if (config.enabled) {
this.v8Profiler = require('v8-profiler-next');
this.v8Profiler.setGenerateType(1); // Use profiler tree structure
}
}
async startCPUProfile(title: string): Promise<string> {
if (!this.config.enabled) return '';
const id = `${title}-${Date.now()}`;
this.v8Profiler.startProfiling(id, true);
return id;
}
async stopCPUProfile(id: string): Promise<CPUProfile> {
if (!this.config.enabled) return {} as CPUProfile;
const profile = this.v8Profiler.stopProfiling(id);
const result = await this.exportProfile(profile);
profile.delete();
return result;
}
takeHeapSnapshot(title: string): any {
if (!this.config.enabled) return null;
const snapshot = this.v8Profiler.takeSnapshot(title);
return snapshot;
}
private async exportProfile(profile: any): Promise<CPUProfile> {
return new Promise((resolve) => {
profile.export((error: any, result: string) => {
if (error) {
console.error('Profile export failed:', error);
resolve({} as CPUProfile);
} else {
resolve(JSON.parse(result));
}
});
});
}
}
Production Monitoring Dashboard
Real-time Monitoring
Copy
export class MonitoringDashboard {
private metrics: RealTimeMetrics;
private websocket: WebSocket;
constructor() {
this.metrics = new RealTimeMetrics();
// Set up WebSocket for real-time updates
if (process.env.MONITORING_WEBSOCKET_URL) {
this.websocket = new WebSocket(process.env.MONITORING_WEBSOCKET_URL);
this.startRealtimeMonitoring();
}
}
private startRealtimeMonitoring(): void {
setInterval(() => {
const snapshot = {
timestamp: new Date().toISOString(),
node: {
type: 'custom-node',
version: '1.0.0',
},
performance: {
cpu: process.cpuUsage(),
memory: process.memoryUsage(),
eventLoop: this.measureEventLoopLag(),
},
throughput: {
requestsPerSecond: this.metrics.getRequestRate(),
itemsPerSecond: this.metrics.getItemRate(),
bytesPerSecond: this.metrics.getThroughput(),
},
errors: {
rate: this.metrics.getErrorRate(),
recent: this.metrics.getRecentErrors(),
},
latency: {
p50: this.metrics.getPercentile(50),
p95: this.metrics.getPercentile(95),
p99: this.metrics.getPercentile(99),
},
};
// Send to dashboard
if (this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send(JSON.stringify(snapshot));
}
}, 1000); // Update every second
}
private measureEventLoopLag(): number {
const start = process.hrtime.bigint();
setImmediate(() => {
const lag = Number(process.hrtime.bigint() - start) / 1000000;
this.metrics.recordEventLoopLag(lag);
});
return this.metrics.getAverageEventLoopLag();
}
}
class RealTimeMetrics {
private requests: number[] = [];
private items: number[] = [];
private bytes: number[] = [];
private errors: any[] = [];
private latencies: number[] = [];
private eventLoopLags: number[] = [];
private windowSize = 60; // 60 second window
recordRequest(): void {
const now = Date.now();
this.requests.push(now);
this.cleanOldData();
}
recordItems(count: number): void {
this.items.push(...Array(count).fill(Date.now()));
this.cleanOldData();
}
recordBytes(bytes: number): void {
this.bytes.push(bytes);
this.cleanOldData();
}
recordError(error: any): void {
this.errors.push({
timestamp: Date.now(),
error: error.message,
});
this.cleanOldData();
}
recordLatency(ms: number): void {
this.latencies.push(ms);
if (this.latencies.length > 1000) {
this.latencies.shift();
}
}
recordEventLoopLag(ms: number): void {
this.eventLoopLags.push(ms);
if (this.eventLoopLags.length > 100) {
this.eventLoopLags.shift();
}
}
getRequestRate(): number {
const now = Date.now();
const recent = this.requests.filter(t => now - t < 1000);
return recent.length;
}
getItemRate(): number {
const now = Date.now();
const recent = this.items.filter(t => now - t < 1000);
return recent.length;
}
getThroughput(): number {
const recentBytes = this.bytes.slice(-10);
return recentBytes.reduce((a, b) => a + b, 0) / recentBytes.length;
}
getErrorRate(): number {
const now = Date.now();
const recentErrors = this.errors.filter(e => now - e.timestamp < 60000);
const recentRequests = this.requests.filter(t => now - t < 60000);
return recentRequests.length > 0 ? recentErrors.length / recentRequests.length : 0;
}
getRecentErrors(): any[] {
return this.errors.slice(-5);
}
getPercentile(p: number): number {
if (this.latencies.length === 0) return 0;
const sorted = [...this.latencies].sort((a, b) => a - b);
const index = Math.ceil((p / 100) * sorted.length) - 1;
return sorted[index];
}
getAverageEventLoopLag(): number {
if (this.eventLoopLags.length === 0) return 0;
return this.eventLoopLags.reduce((a, b) => a + b, 0) / this.eventLoopLags.length;
}
private cleanOldData(): void {
const cutoff = Date.now() - (this.windowSize * 1000);
this.requests = this.requests.filter(t => t > cutoff);
this.items = this.items.filter(t => t > cutoff);
this.errors = this.errors.filter(e => e.timestamp > cutoff);
}
}
Best Practices Summary
Observability Checklist
Production Observability Requirements
Production Observability Requirements
-
Structured Logging
- Use JSON format for machine parsing
- Include correlation IDs for tracing
- Sanitize sensitive data
- Set appropriate log levels
-
Comprehensive Metrics
- Track business metrics (items processed, success rate)
- Monitor system metrics (CPU, memory, latency)
- Use histograms for distributions
- Export for Prometheus/Grafana
-
Distributed Tracing
- Implement OpenTelemetry
- Propagate trace context
- Create meaningful spans
- Include relevant attributes
-
Error Tracking
- Capture full error context
- Include stack traces in development
- Group similar errors
- Alert on error rate thresholds
-
Performance Monitoring
- Profile CPU and memory usage
- Identify bottlenecks
- Track event loop lag
- Monitor garbage collection
-
Debugging Support
- Remote debugging capability
- Debug snapshots on errors
- Conditional breakpoints
- Variable inspection
Next Steps
Complete your n8n developer journey with testing and CI/CD:Testing & CI/CD Pipelines
Implement comprehensive testing and continuous deployment for n8n nodes