Why Observability Matters in n8n
In production, you canβt attach a debugger to see what went wrong. Observability is your window into the system - it tells you not just that something failed, but why, when, and how to fix it.
- Logs: What happened and when
- Metrics: How the system is performing
- Traces: How requests flow through the system
Structured Logging
Building a Production Logging System
Copy
Ask AI
import { IExecuteFunctions, ILogger } from 'n8n-workflow';
import * as winston from 'winston';
export class ObservableNode {
private logger: EnhancedLogger;
constructor() {
this.logger = new EnhancedLogger({
service: 'n8n-custom-node',
environment: process.env.NODE_ENV || 'development',
});
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Create correlation ID for request tracing
const correlationId = this.generateCorrelationId();
const context = this.buildExecutionContext(correlationId);
// Log execution start
this.logger.info('Node execution started', context);
try {
// Execute with detailed logging
const result = await this.executeWithLogging(context);
this.logger.info('Node execution completed', {
...context,
itemsProcessed: result.length,
executionTime: Date.now() - context.startTime,
});
return [result];
} catch (error) {
this.logger.error('Node execution failed', {
...context,
error: this.serializeError(error),
stack: error.stack,
});
throw error;
}
}
private buildExecutionContext(correlationId: string): ExecutionContext {
return {
correlationId,
workflowId: this.getWorkflow().id,
workflowName: this.getWorkflow().name,
nodeType: this.getNode().type,
nodeName: this.getNode().name,
executionId: this.getExecutionId(),
startTime: Date.now(),
userId: this.getWorkflowStaticData('userId'),
environment: {
nodeVersion: process.version,
platform: process.platform,
memoryUsage: process.memoryUsage(),
},
};
}
private async executeWithLogging(
context: ExecutionContext
): Promise<INodeExecutionData[]> {
const items = this.getInputData();
const results: INodeExecutionData[] = [];
for (let i = 0; i < items.length; i++) {
const itemContext = { ...context, itemIndex: i };
this.logger.debug('Processing item', itemContext);
try {
const result = await this.processItem(items[i], itemContext);
results.push(result);
this.logger.debug('Item processed successfully', {
...itemContext,
resultSize: JSON.stringify(result).length,
});
} catch (error) {
this.logger.error('Item processing failed', {
...itemContext,
error: this.serializeError(error),
input: this.sanitizeForLogging(items[i]),
});
// Decide whether to continue or fail
if (this.continueOnFail()) {
results.push({
json: { error: error.message },
error: error,
});
} else {
throw error;
}
}
}
return results;
}
private serializeError(error: any): any {
return {
name: error.name,
message: error.message,
code: error.code,
statusCode: error.statusCode,
details: error.details,
};
}
private sanitizeForLogging(data: any): any {
// Remove sensitive data before logging
const sanitized = JSON.parse(JSON.stringify(data));
const sensitiveFields = ['password', 'apiKey', 'token', 'secret'];
const sanitizeObject = (obj: any) => {
for (const key in obj) {
if (sensitiveFields.some(field => key.toLowerCase().includes(field))) {
obj[key] = '[REDACTED]';
} else if (typeof obj[key] === 'object' && obj[key] !== null) {
sanitizeObject(obj[key]);
}
}
};
sanitizeObject(sanitized);
return sanitized;
}
}
class EnhancedLogger {
private winston: winston.Logger;
constructor(private config: LoggerConfig) {
this.winston = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: {
service: config.service,
environment: config.environment,
},
transports: this.getTransports(),
});
}
private getTransports(): winston.transport[] {
const transports: winston.transport[] = [];
// Console transport for development
if (process.env.NODE_ENV !== 'production') {
transports.push(
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
),
})
);
}
// File transport for production
if (process.env.NODE_ENV === 'production') {
transports.push(
new winston.transports.File({
filename: '/var/log/n8n/error.log',
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5,
}),
new winston.transports.File({
filename: '/var/log/n8n/combined.log',
maxsize: 5242880, // 5MB
maxFiles: 5,
})
);
}
// CloudWatch, DataDog, etc.
if (process.env.CLOUDWATCH_ENABLED) {
const CloudWatchTransport = require('winston-cloudwatch');
transports.push(
new CloudWatchTransport({
logGroupName: '/aws/n8n',
logStreamName: `${config.service}-${Date.now()}`,
region: process.env.AWS_REGION,
})
);
}
return transports;
}
info(message: string, meta?: any): void {
this.winston.info(message, meta);
}
error(message: string, meta?: any): void {
this.winston.error(message, meta);
}
warn(message: string, meta?: any): void {
this.winston.warn(message, meta);
}
debug(message: string, meta?: any): void {
this.winston.debug(message, meta);
}
}
Log Aggregation Patterns
Copy
Ask AI
export class LogAggregatorNode {
private logBuffer: LogEntry[] = [];
private flushInterval: NodeJS.Timeout;
constructor() {
// Batch logs for efficient shipping
this.flushInterval = setInterval(() => {
this.flushLogs();
}, 5000); // Flush every 5 seconds
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const operation = this.getNodeParameter('operation', 0) as string;
// Structured logging with context
this.log({
level: 'info',
message: 'Operation started',
operation,
timestamp: new Date().toISOString(),
context: this.getExecutionContext(),
});
try {
const result = await this.performOperation(operation);
this.log({
level: 'info',
message: 'Operation completed',
operation,
duration: this.calculateDuration(),
resultCount: result.length,
});
return [result];
} catch (error) {
this.log({
level: 'error',
message: 'Operation failed',
operation,
error: error.message,
stack: error.stack,
});
throw error;
}
}
private log(entry: LogEntry): void {
// Add to buffer
this.logBuffer.push(entry);
// Immediate flush for errors
if (entry.level === 'error') {
this.flushLogs();
}
// Flush if buffer is getting large
if (this.logBuffer.length >= 100) {
this.flushLogs();
}
}
private async flushLogs(): Promise<void> {
if (this.logBuffer.length === 0) return;
const logs = [...this.logBuffer];
this.logBuffer = [];
try {
// Send to log aggregation service
await this.sendToLogService(logs);
} catch (error) {
console.error('Failed to flush logs:', error);
// Re-add logs to buffer for retry
this.logBuffer.unshift(...logs);
}
}
private async sendToLogService(logs: LogEntry[]): Promise<void> {
// Send to ELK Stack, Datadog, Splunk, etc.
const endpoint = process.env.LOG_AGGREGATOR_ENDPOINT;
if (!endpoint) {
// Fallback to console
logs.forEach(log => console.log(JSON.stringify(log)));
return;
}
await fetch(endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': process.env.LOG_AGGREGATOR_API_KEY!,
},
body: JSON.stringify({
logs,
source: 'n8n-custom-node',
hostname: require('os').hostname(),
}),
});
}
destroy(): void {
clearInterval(this.flushInterval);
this.flushLogs(); // Final flush
}
}
Metrics Collection
Custom Metrics Implementation
Metrics are not logs. Logs tell you what happened, metrics tell you how much and how fast. Use both for complete observability.
Copy
Ask AI
import { StatsD } from 'node-statsd';
import * as promClient from 'prom-client';
export class MetricsNode {
private statsd: StatsD;
private promRegistry: promClient.Registry;
private metrics: NodeMetrics;
constructor() {
// Initialize StatsD client
this.statsd = new StatsD({
host: process.env.STATSD_HOST || 'localhost',
port: parseInt(process.env.STATSD_PORT || '8125'),
prefix: 'n8n.custom_node.',
});
// Initialize Prometheus registry
this.promRegistry = new promClient.Registry();
// Define metrics
this.metrics = {
executionCounter: new promClient.Counter({
name: 'node_executions_total',
help: 'Total number of node executions',
labelNames: ['status', 'node_type'],
registers: [this.promRegistry],
}),
executionDuration: new promClient.Histogram({
name: 'node_execution_duration_seconds',
help: 'Duration of node executions',
labelNames: ['node_type'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
registers: [this.promRegistry],
}),
itemsProcessed: new promClient.Counter({
name: 'items_processed_total',
help: 'Total number of items processed',
labelNames: ['node_type', 'result'],
registers: [this.promRegistry],
}),
memoryUsage: new promClient.Gauge({
name: 'node_memory_usage_bytes',
help: 'Memory usage of the node',
labelNames: ['type'],
registers: [this.promRegistry],
}),
activeExecutions: new promClient.Gauge({
name: 'active_executions',
help: 'Number of currently active executions',
registers: [this.promRegistry],
}),
};
// Start metrics collection
this.startMetricsCollection();
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const timer = this.metrics.executionDuration.startTimer({
node_type: this.getNode().type,
});
// Track active execution
this.metrics.activeExecutions.inc();
try {
// Track memory before execution
const memBefore = process.memoryUsage();
const result = await this.executeWithMetrics();
// Track memory after execution
const memAfter = process.memoryUsage();
const memDelta = memAfter.heapUsed - memBefore.heapUsed;
// Record metrics
this.metrics.executionCounter.inc({
status: 'success',
node_type: this.getNode().type,
});
this.metrics.itemsProcessed.inc(
{
node_type: this.getNode().type,
result: 'success',
},
result.length
);
this.metrics.memoryUsage.set(
{ type: 'heap_used' },
memAfter.heapUsed
);
// Send to StatsD
this.statsd.increment('execution.success');
this.statsd.timing('execution.duration', timer());
this.statsd.gauge('memory.delta', memDelta);
this.statsd.histogram('items.count', result.length);
return [result];
} catch (error) {
this.metrics.executionCounter.inc({
status: 'error',
node_type: this.getNode().type,
});
this.statsd.increment('execution.error');
this.statsd.increment(`error.${error.code || 'unknown'}`);
throw error;
} finally {
timer();
this.metrics.activeExecutions.dec();
}
}
private startMetricsCollection(): void {
// Collect default metrics (CPU, memory, etc.)
promClient.collectDefaultMetrics({
register: this.promRegistry,
prefix: 'n8n_node_',
});
// Export metrics endpoint for Prometheus scraping
if (process.env.METRICS_PORT) {
const express = require('express');
const app = express();
app.get('/metrics', async (req, res) => {
res.set('Content-Type', this.promRegistry.contentType);
const metrics = await this.promRegistry.metrics();
res.end(metrics);
});
app.listen(process.env.METRICS_PORT, () => {
console.log(`Metrics available at :${process.env.METRICS_PORT}/metrics`);
});
}
}
private async executeWithMetrics(): Promise<INodeExecutionData[]> {
const items = this.getInputData();
const results: INodeExecutionData[] = [];
// Track processing rate
const startTime = Date.now();
let processedCount = 0;
for (const item of items) {
const itemTimer = this.metrics.executionDuration.startTimer({
node_type: `${this.getNode().type}_item`,
});
try {
const result = await this.processItem(item);
results.push(result);
processedCount++;
// Report processing rate periodically
if (processedCount % 100 === 0) {
const rate = processedCount / ((Date.now() - startTime) / 1000);
this.statsd.gauge('processing.rate', rate);
}
} catch (error) {
this.metrics.itemsProcessed.inc(
{
node_type: this.getNode().type,
result: 'error',
},
1
);
} finally {
itemTimer();
}
}
return results;
}
}
interface NodeMetrics {
executionCounter: promClient.Counter<string>;
executionDuration: promClient.Histogram<string>;
itemsProcessed: promClient.Counter<string>;
memoryUsage: promClient.Gauge<string>;
activeExecutions: promClient.Gauge<string>;
}
Distributed Tracing
OpenTelemetry Integration
Copy
Ask AI
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import * as api from '@opentelemetry/api';
export class TracedNode {
private tracer: api.Tracer;
constructor() {
// Initialize OpenTelemetry
const sdk = new NodeSDK({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'n8n-custom-node',
[SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
}),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();
this.tracer = api.trace.getTracer('n8n-custom-node', '1.0.0');
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Start root span
return await this.tracer.startActiveSpan(
'node.execute',
async (span) => {
try {
// Add span attributes
span.setAttributes({
'node.type': this.getNode().type,
'node.name': this.getNode().name,
'workflow.id': this.getWorkflow().id,
'execution.id': this.getExecutionId(),
});
// Extract parent context if available
const parentContext = this.extractParentContext();
if (parentContext) {
span.addLink({ context: parentContext });
}
const result = await this.executeWithTracing(span);
span.setStatus({ code: api.SpanStatusCode.OK });
return [result];
} catch (error) {
span.recordException(error);
span.setStatus({
code: api.SpanStatusCode.ERROR,
message: error.message,
});
throw error;
} finally {
span.end();
}
}
);
}
private async executeWithTracing(
parentSpan: api.Span
): Promise<INodeExecutionData[]> {
const items = this.getInputData();
const results: INodeExecutionData[] = [];
for (let i = 0; i < items.length; i++) {
await this.tracer.startActiveSpan(
'item.process',
{ attributes: { 'item.index': i } },
async (span) => {
try {
// Add baggage for cross-service propagation
const baggage = api.propagation.getBaggage(api.context.active());
baggage?.setEntry('item.id', { value: String(i) });
const result = await this.processItemWithTracing(items[i], span);
results.push(result);
span.setStatus({ code: api.SpanStatusCode.OK });
} catch (error) {
span.recordException(error);
span.setStatus({
code: api.SpanStatusCode.ERROR,
message: error.message,
});
} finally {
span.end();
}
}
);
}
return results;
}
private async processItemWithTracing(
item: INodeExecutionData,
parentSpan: api.Span
): Promise<INodeExecutionData> {
// Create child spans for sub-operations
const validationSpan = this.tracer.startSpan('validate', {
parent: parentSpan,
});
try {
await this.validateItem(item);
} finally {
validationSpan.end();
}
const transformSpan = this.tracer.startSpan('transform', {
parent: parentSpan,
});
try {
const transformed = await this.transformItem(item);
return transformed;
} finally {
transformSpan.end();
}
}
private extractParentContext(): api.Context | null {
// Extract trace context from headers or metadata
const traceHeader = this.getIncomingHeader('traceparent');
if (traceHeader) {
const context = api.propagation.extract(
api.context.active(),
{ traceparent: traceHeader }
);
return context;
}
return null;
}
}
Advanced Debugging Techniques
Remote Debugging Setup
Copy
Ask AI
export class DebuggableNode {
private debugger: RemoteDebugger;
constructor() {
if (process.env.ENABLE_REMOTE_DEBUG) {
this.debugger = new RemoteDebugger({
port: parseInt(process.env.DEBUG_PORT || '9229'),
breakOnStart: process.env.BREAK_ON_START === 'true',
});
}
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Conditional breakpoint
if (this.shouldBreak()) {
debugger; // Will pause here if inspector is attached
}
// Debug context injection
const debugContext = this.injectDebugContext();
try {
const result = await this.executeWithDebugging(debugContext);
return [result];
} catch (error) {
// Capture debug snapshot on error
await this.captureDebugSnapshot(error, debugContext);
throw error;
}
}
private shouldBreak(): boolean {
// Break on specific conditions
const breakConditions = process.env.DEBUG_CONDITIONS?.split(',') || [];
return breakConditions.some(condition => {
switch (condition) {
case 'large_payload':
return JSON.stringify(this.getInputData()).length > 10000;
case 'specific_node':
return this.getNode().name === process.env.DEBUG_NODE_NAME;
case 'error_rate_high':
return this.getErrorRate() > 0.1;
default:
return false;
}
});
}
private injectDebugContext(): DebugContext {
return {
sessionId: this.generateSessionId(),
timestamp: Date.now(),
inputs: this.captureInputs(),
environment: this.captureEnvironment(),
stack: new Error().stack,
};
}
private async captureDebugSnapshot(
error: any,
context: DebugContext
): Promise<void> {
const snapshot = {
error: {
message: error.message,
stack: error.stack,
code: error.code,
},
context,
state: {
memory: process.memoryUsage(),
cpu: process.cpuUsage(),
uptime: process.uptime(),
},
variables: this.captureLocalVariables(),
timeline: this.getExecutionTimeline(),
};
// Save snapshot for analysis
await this.saveDebugSnapshot(snapshot);
// Send to debug service if configured
if (process.env.DEBUG_SERVICE_URL) {
await this.sendToDebugService(snapshot);
}
}
private captureLocalVariables(): any {
// Use V8 inspector to capture local scope
const inspector = require('inspector');
const session = new inspector.Session();
session.connect();
const variables: any = {};
session.post('Runtime.evaluate', {
expression: 'Object.keys(this)',
includeCommandLineAPI: true,
}, (err, result) => {
if (!err && result.result) {
variables.scope = result.result;
}
});
session.disconnect();
return variables;
}
private async saveDebugSnapshot(snapshot: any): Promise<void> {
const fs = require('fs').promises;
const path = require('path');
const debugDir = process.env.DEBUG_OUTPUT_DIR || '/tmp/n8n-debug';
await fs.mkdir(debugDir, { recursive: true });
const filename = `debug-${snapshot.context.sessionId}-${Date.now()}.json`;
const filepath = path.join(debugDir, filename);
await fs.writeFile(filepath, JSON.stringify(snapshot, null, 2));
console.log(`Debug snapshot saved to: ${filepath}`);
}
}
class RemoteDebugger {
constructor(private config: DebugConfig) {
if (config.breakOnStart) {
// Start inspector and wait for debugger
const inspector = require('inspector');
inspector.open(config.port, '0.0.0.0', true);
console.log(`Debugger listening on port ${config.port}`);
debugger; // Pause execution
}
}
}
Performance Profiling
Copy
Ask AI
export class ProfiledNode {
private profiler: Profiler;
constructor() {
this.profiler = new Profiler({
enabled: process.env.PROFILING_ENABLED === 'true',
sampleRate: parseInt(process.env.PROFILING_SAMPLE_RATE || '100'),
});
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
// Start CPU profiling
const cpuProfile = await this.profiler.startCPUProfile('node-execution');
// Start heap profiling
const heapSnapshot = this.profiler.takeHeapSnapshot('before');
try {
const result = await this.executeWithProfiling();
// Stop profiling and analyze
const profile = await this.profiler.stopCPUProfile(cpuProfile);
await this.analyzeProfile(profile);
return [result];
} finally {
// Take final heap snapshot
const finalHeap = this.profiler.takeHeapSnapshot('after');
await this.compareHeapSnapshots(heapSnapshot, finalHeap);
}
}
private async analyzeProfile(profile: CPUProfile): Promise<void> {
// Find hot spots
const hotFunctions = profile.nodes
.sort((a, b) => b.hitCount - a.hitCount)
.slice(0, 10);
this.logger.info('CPU Profile Hot Spots:', {
totalTime: profile.duration,
hotFunctions: hotFunctions.map(f => ({
name: f.functionName,
hits: f.hitCount,
percentage: (f.hitCount / profile.samples.length) * 100,
})),
});
// Check for performance issues
const issues = this.detectPerformanceIssues(profile);
if (issues.length > 0) {
this.logger.warn('Performance issues detected:', issues);
}
}
private detectPerformanceIssues(profile: CPUProfile): string[] {
const issues: string[] = [];
// Check for excessive GC
const gcTime = profile.nodes
.filter(n => n.functionName.includes('GC'))
.reduce((sum, n) => sum + n.hitCount, 0);
if (gcTime / profile.samples.length > 0.1) {
issues.push('Excessive garbage collection (>10% of CPU time)');
}
// Check for blocking operations
const blockingOps = profile.nodes.filter(n =>
['readFileSync', 'writeFileSync', 'execSync'].some(op =>
n.functionName.includes(op)
)
);
if (blockingOps.length > 0) {
issues.push('Blocking I/O operations detected');
}
return issues;
}
}
class Profiler {
private v8Profiler: any;
constructor(private config: ProfilerConfig) {
if (config.enabled) {
this.v8Profiler = require('v8-profiler-next');
this.v8Profiler.setGenerateType(1); // Use profiler tree structure
}
}
async startCPUProfile(title: string): Promise<string> {
if (!this.config.enabled) return '';
const id = `${title}-${Date.now()}`;
this.v8Profiler.startProfiling(id, true);
return id;
}
async stopCPUProfile(id: string): Promise<CPUProfile> {
if (!this.config.enabled) return {} as CPUProfile;
const profile = this.v8Profiler.stopProfiling(id);
const result = await this.exportProfile(profile);
profile.delete();
return result;
}
takeHeapSnapshot(title: string): any {
if (!this.config.enabled) return null;
const snapshot = this.v8Profiler.takeSnapshot(title);
return snapshot;
}
private async exportProfile(profile: any): Promise<CPUProfile> {
return new Promise((resolve) => {
profile.export((error: any, result: string) => {
if (error) {
console.error('Profile export failed:', error);
resolve({} as CPUProfile);
} else {
resolve(JSON.parse(result));
}
});
});
}
}
Production Monitoring Dashboard
Real-time Monitoring
Copy
Ask AI
export class MonitoringDashboard {
private metrics: RealTimeMetrics;
private websocket: WebSocket;
constructor() {
this.metrics = new RealTimeMetrics();
// Set up WebSocket for real-time updates
if (process.env.MONITORING_WEBSOCKET_URL) {
this.websocket = new WebSocket(process.env.MONITORING_WEBSOCKET_URL);
this.startRealtimeMonitoring();
}
}
private startRealtimeMonitoring(): void {
setInterval(() => {
const snapshot = {
timestamp: new Date().toISOString(),
node: {
type: 'custom-node',
version: '1.0.0',
},
performance: {
cpu: process.cpuUsage(),
memory: process.memoryUsage(),
eventLoop: this.measureEventLoopLag(),
},
throughput: {
requestsPerSecond: this.metrics.getRequestRate(),
itemsPerSecond: this.metrics.getItemRate(),
bytesPerSecond: this.metrics.getThroughput(),
},
errors: {
rate: this.metrics.getErrorRate(),
recent: this.metrics.getRecentErrors(),
},
latency: {
p50: this.metrics.getPercentile(50),
p95: this.metrics.getPercentile(95),
p99: this.metrics.getPercentile(99),
},
};
// Send to dashboard
if (this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send(JSON.stringify(snapshot));
}
}, 1000); // Update every second
}
private measureEventLoopLag(): number {
const start = process.hrtime.bigint();
setImmediate(() => {
const lag = Number(process.hrtime.bigint() - start) / 1000000;
this.metrics.recordEventLoopLag(lag);
});
return this.metrics.getAverageEventLoopLag();
}
}
class RealTimeMetrics {
private requests: number[] = [];
private items: number[] = [];
private bytes: number[] = [];
private errors: any[] = [];
private latencies: number[] = [];
private eventLoopLags: number[] = [];
private windowSize = 60; // 60 second window
recordRequest(): void {
const now = Date.now();
this.requests.push(now);
this.cleanOldData();
}
recordItems(count: number): void {
this.items.push(...Array(count).fill(Date.now()));
this.cleanOldData();
}
recordBytes(bytes: number): void {
this.bytes.push(bytes);
this.cleanOldData();
}
recordError(error: any): void {
this.errors.push({
timestamp: Date.now(),
error: error.message,
});
this.cleanOldData();
}
recordLatency(ms: number): void {
this.latencies.push(ms);
if (this.latencies.length > 1000) {
this.latencies.shift();
}
}
recordEventLoopLag(ms: number): void {
this.eventLoopLags.push(ms);
if (this.eventLoopLags.length > 100) {
this.eventLoopLags.shift();
}
}
getRequestRate(): number {
const now = Date.now();
const recent = this.requests.filter(t => now - t < 1000);
return recent.length;
}
getItemRate(): number {
const now = Date.now();
const recent = this.items.filter(t => now - t < 1000);
return recent.length;
}
getThroughput(): number {
const recentBytes = this.bytes.slice(-10);
return recentBytes.reduce((a, b) => a + b, 0) / recentBytes.length;
}
getErrorRate(): number {
const now = Date.now();
const recentErrors = this.errors.filter(e => now - e.timestamp < 60000);
const recentRequests = this.requests.filter(t => now - t < 60000);
return recentRequests.length > 0 ? recentErrors.length / recentRequests.length : 0;
}
getRecentErrors(): any[] {
return this.errors.slice(-5);
}
getPercentile(p: number): number {
if (this.latencies.length === 0) return 0;
const sorted = [...this.latencies].sort((a, b) => a - b);
const index = Math.ceil((p / 100) * sorted.length) - 1;
return sorted[index];
}
getAverageEventLoopLag(): number {
if (this.eventLoopLags.length === 0) return 0;
return this.eventLoopLags.reduce((a, b) => a + b, 0) / this.eventLoopLags.length;
}
private cleanOldData(): void {
const cutoff = Date.now() - (this.windowSize * 1000);
this.requests = this.requests.filter(t => t > cutoff);
this.items = this.items.filter(t => t > cutoff);
this.errors = this.errors.filter(e => e.timestamp > cutoff);
}
}
Best Practices Summary
Observability Checklist
Production Observability Requirements
Production Observability Requirements
-
Structured Logging
- Use JSON format for machine parsing
- Include correlation IDs for tracing
- Sanitize sensitive data
- Set appropriate log levels
-
Comprehensive Metrics
- Track business metrics (items processed, success rate)
- Monitor system metrics (CPU, memory, latency)
- Use histograms for distributions
- Export for Prometheus/Grafana
-
Distributed Tracing
- Implement OpenTelemetry
- Propagate trace context
- Create meaningful spans
- Include relevant attributes
-
Error Tracking
- Capture full error context
- Include stack traces in development
- Group similar errors
- Alert on error rate thresholds
-
Performance Monitoring
- Profile CPU and memory usage
- Identify bottlenecks
- Track event loop lag
- Monitor garbage collection
-
Debugging Support
- Remote debugging capability
- Debug snapshots on errors
- Conditional breakpoints
- Variable inspection
Next Steps
Complete your n8n developer journey with testing and CI/CD:Testing & CI/CD Pipelines
Implement comprehensive testing and continuous deployment for n8n nodes