Why Observability Matters in n8n

In production, you can’t attach a debugger to see what went wrong. Observability is your window into the system - it tells you not just that something failed, but why, when, and how to fix it.
The three pillars of observability in n8n:
  • Logs: What happened and when
  • Metrics: How the system is performing
  • Traces: How requests flow through the system

Structured Logging

Building a Production Logging System

import { IExecuteFunctions, ILogger } from 'n8n-workflow';
import * as winston from 'winston';

export class ObservableNode {
  private logger: EnhancedLogger;

  constructor() {
    this.logger = new EnhancedLogger({
      service: 'n8n-custom-node',
      environment: process.env.NODE_ENV || 'development',
    });
  }

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    // Create correlation ID for request tracing
    const correlationId = this.generateCorrelationId();
    const context = this.buildExecutionContext(correlationId);

    // Log execution start
    this.logger.info('Node execution started', context);

    try {
      // Execute with detailed logging
      const result = await this.executeWithLogging(context);

      this.logger.info('Node execution completed', {
        ...context,
        itemsProcessed: result.length,
        executionTime: Date.now() - context.startTime,
      });

      return [result];
    } catch (error) {
      this.logger.error('Node execution failed', {
        ...context,
        error: this.serializeError(error),
        stack: error.stack,
      });
      throw error;
    }
  }

  private buildExecutionContext(correlationId: string): ExecutionContext {
    return {
      correlationId,
      workflowId: this.getWorkflow().id,
      workflowName: this.getWorkflow().name,
      nodeType: this.getNode().type,
      nodeName: this.getNode().name,
      executionId: this.getExecutionId(),
      startTime: Date.now(),
      userId: this.getWorkflowStaticData('userId'),
      environment: {
        nodeVersion: process.version,
        platform: process.platform,
        memoryUsage: process.memoryUsage(),
      },
    };
  }

  private async executeWithLogging(
    context: ExecutionContext
  ): Promise<INodeExecutionData[]> {
    const items = this.getInputData();
    const results: INodeExecutionData[] = [];

    for (let i = 0; i < items.length; i++) {
      const itemContext = { ...context, itemIndex: i };

      this.logger.debug('Processing item', itemContext);

      try {
        const result = await this.processItem(items[i], itemContext);
        results.push(result);

        this.logger.debug('Item processed successfully', {
          ...itemContext,
          resultSize: JSON.stringify(result).length,
        });
      } catch (error) {
        this.logger.error('Item processing failed', {
          ...itemContext,
          error: this.serializeError(error),
          input: this.sanitizeForLogging(items[i]),
        });

        // Decide whether to continue or fail
        if (this.continueOnFail()) {
          results.push({
            json: { error: error.message },
            error: error,
          });
        } else {
          throw error;
        }
      }
    }

    return results;
  }

  private serializeError(error: any): any {
    return {
      name: error.name,
      message: error.message,
      code: error.code,
      statusCode: error.statusCode,
      details: error.details,
    };
  }

  private sanitizeForLogging(data: any): any {
    // Remove sensitive data before logging
    const sanitized = JSON.parse(JSON.stringify(data));
    const sensitiveFields = ['password', 'apiKey', 'token', 'secret'];

    const sanitizeObject = (obj: any) => {
      for (const key in obj) {
        if (sensitiveFields.some(field => key.toLowerCase().includes(field))) {
          obj[key] = '[REDACTED]';
        } else if (typeof obj[key] === 'object' && obj[key] !== null) {
          sanitizeObject(obj[key]);
        }
      }
    };

    sanitizeObject(sanitized);
    return sanitized;
  }
}

class EnhancedLogger {
  private winston: winston.Logger;

  constructor(private config: LoggerConfig) {
    this.winston = winston.createLogger({
      level: process.env.LOG_LEVEL || 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.errors({ stack: true }),
        winston.format.json()
      ),
      defaultMeta: {
        service: config.service,
        environment: config.environment,
      },
      transports: this.getTransports(),
    });
  }

  private getTransports(): winston.transport[] {
    const transports: winston.transport[] = [];

    // Console transport for development
    if (process.env.NODE_ENV !== 'production') {
      transports.push(
        new winston.transports.Console({
          format: winston.format.combine(
            winston.format.colorize(),
            winston.format.simple()
          ),
        })
      );
    }

    // File transport for production
    if (process.env.NODE_ENV === 'production') {
      transports.push(
        new winston.transports.File({
          filename: '/var/log/n8n/error.log',
          level: 'error',
          maxsize: 5242880, // 5MB
          maxFiles: 5,
        }),
        new winston.transports.File({
          filename: '/var/log/n8n/combined.log',
          maxsize: 5242880, // 5MB
          maxFiles: 5,
        })
      );
    }

    // CloudWatch, DataDog, etc.
    if (process.env.CLOUDWATCH_ENABLED) {
      const CloudWatchTransport = require('winston-cloudwatch');
      transports.push(
        new CloudWatchTransport({
          logGroupName: '/aws/n8n',
          logStreamName: `${config.service}-${Date.now()}`,
          region: process.env.AWS_REGION,
        })
      );
    }

    return transports;
  }

  info(message: string, meta?: any): void {
    this.winston.info(message, meta);
  }

  error(message: string, meta?: any): void {
    this.winston.error(message, meta);
  }

  warn(message: string, meta?: any): void {
    this.winston.warn(message, meta);
  }

  debug(message: string, meta?: any): void {
    this.winston.debug(message, meta);
  }
}

Log Aggregation Patterns

export class LogAggregatorNode {
  private logBuffer: LogEntry[] = [];
  private flushInterval: NodeJS.Timeout;

  constructor() {
    // Batch logs for efficient shipping
    this.flushInterval = setInterval(() => {
      this.flushLogs();
    }, 5000); // Flush every 5 seconds
  }

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    const operation = this.getNodeParameter('operation', 0) as string;

    // Structured logging with context
    this.log({
      level: 'info',
      message: 'Operation started',
      operation,
      timestamp: new Date().toISOString(),
      context: this.getExecutionContext(),
    });

    try {
      const result = await this.performOperation(operation);

      this.log({
        level: 'info',
        message: 'Operation completed',
        operation,
        duration: this.calculateDuration(),
        resultCount: result.length,
      });

      return [result];
    } catch (error) {
      this.log({
        level: 'error',
        message: 'Operation failed',
        operation,
        error: error.message,
        stack: error.stack,
      });
      throw error;
    }
  }

  private log(entry: LogEntry): void {
    // Add to buffer
    this.logBuffer.push(entry);

    // Immediate flush for errors
    if (entry.level === 'error') {
      this.flushLogs();
    }

    // Flush if buffer is getting large
    if (this.logBuffer.length >= 100) {
      this.flushLogs();
    }
  }

  private async flushLogs(): Promise<void> {
    if (this.logBuffer.length === 0) return;

    const logs = [...this.logBuffer];
    this.logBuffer = [];

    try {
      // Send to log aggregation service
      await this.sendToLogService(logs);
    } catch (error) {
      console.error('Failed to flush logs:', error);
      // Re-add logs to buffer for retry
      this.logBuffer.unshift(...logs);
    }
  }

  private async sendToLogService(logs: LogEntry[]): Promise<void> {
    // Send to ELK Stack, Datadog, Splunk, etc.
    const endpoint = process.env.LOG_AGGREGATOR_ENDPOINT;

    if (!endpoint) {
      // Fallback to console
      logs.forEach(log => console.log(JSON.stringify(log)));
      return;
    }

    await fetch(endpoint, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-API-Key': process.env.LOG_AGGREGATOR_API_KEY!,
      },
      body: JSON.stringify({
        logs,
        source: 'n8n-custom-node',
        hostname: require('os').hostname(),
      }),
    });
  }

  destroy(): void {
    clearInterval(this.flushInterval);
    this.flushLogs(); // Final flush
  }
}

Metrics Collection

Custom Metrics Implementation

Metrics are not logs. Logs tell you what happened, metrics tell you how much and how fast. Use both for complete observability.
import { StatsD } from 'node-statsd';
import * as promClient from 'prom-client';

export class MetricsNode {
  private statsd: StatsD;
  private promRegistry: promClient.Registry;
  private metrics: NodeMetrics;

  constructor() {
    // Initialize StatsD client
    this.statsd = new StatsD({
      host: process.env.STATSD_HOST || 'localhost',
      port: parseInt(process.env.STATSD_PORT || '8125'),
      prefix: 'n8n.custom_node.',
    });

    // Initialize Prometheus registry
    this.promRegistry = new promClient.Registry();

    // Define metrics
    this.metrics = {
      executionCounter: new promClient.Counter({
        name: 'node_executions_total',
        help: 'Total number of node executions',
        labelNames: ['status', 'node_type'],
        registers: [this.promRegistry],
      }),
      executionDuration: new promClient.Histogram({
        name: 'node_execution_duration_seconds',
        help: 'Duration of node executions',
        labelNames: ['node_type'],
        buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
        registers: [this.promRegistry],
      }),
      itemsProcessed: new promClient.Counter({
        name: 'items_processed_total',
        help: 'Total number of items processed',
        labelNames: ['node_type', 'result'],
        registers: [this.promRegistry],
      }),
      memoryUsage: new promClient.Gauge({
        name: 'node_memory_usage_bytes',
        help: 'Memory usage of the node',
        labelNames: ['type'],
        registers: [this.promRegistry],
      }),
      activeExecutions: new promClient.Gauge({
        name: 'active_executions',
        help: 'Number of currently active executions',
        registers: [this.promRegistry],
      }),
    };

    // Start metrics collection
    this.startMetricsCollection();
  }

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    const timer = this.metrics.executionDuration.startTimer({
      node_type: this.getNode().type,
    });

    // Track active execution
    this.metrics.activeExecutions.inc();

    try {
      // Track memory before execution
      const memBefore = process.memoryUsage();

      const result = await this.executeWithMetrics();

      // Track memory after execution
      const memAfter = process.memoryUsage();
      const memDelta = memAfter.heapUsed - memBefore.heapUsed;

      // Record metrics
      this.metrics.executionCounter.inc({
        status: 'success',
        node_type: this.getNode().type,
      });

      this.metrics.itemsProcessed.inc(
        {
          node_type: this.getNode().type,
          result: 'success',
        },
        result.length
      );

      this.metrics.memoryUsage.set(
        { type: 'heap_used' },
        memAfter.heapUsed
      );

      // Send to StatsD
      this.statsd.increment('execution.success');
      this.statsd.timing('execution.duration', timer());
      this.statsd.gauge('memory.delta', memDelta);
      this.statsd.histogram('items.count', result.length);

      return [result];
    } catch (error) {
      this.metrics.executionCounter.inc({
        status: 'error',
        node_type: this.getNode().type,
      });

      this.statsd.increment('execution.error');
      this.statsd.increment(`error.${error.code || 'unknown'}`);

      throw error;
    } finally {
      timer();
      this.metrics.activeExecutions.dec();
    }
  }

  private startMetricsCollection(): void {
    // Collect default metrics (CPU, memory, etc.)
    promClient.collectDefaultMetrics({
      register: this.promRegistry,
      prefix: 'n8n_node_',
    });

    // Export metrics endpoint for Prometheus scraping
    if (process.env.METRICS_PORT) {
      const express = require('express');
      const app = express();

      app.get('/metrics', async (req, res) => {
        res.set('Content-Type', this.promRegistry.contentType);
        const metrics = await this.promRegistry.metrics();
        res.end(metrics);
      });

      app.listen(process.env.METRICS_PORT, () => {
        console.log(`Metrics available at :${process.env.METRICS_PORT}/metrics`);
      });
    }
  }

  private async executeWithMetrics(): Promise<INodeExecutionData[]> {
    const items = this.getInputData();
    const results: INodeExecutionData[] = [];

    // Track processing rate
    const startTime = Date.now();
    let processedCount = 0;

    for (const item of items) {
      const itemTimer = this.metrics.executionDuration.startTimer({
        node_type: `${this.getNode().type}_item`,
      });

      try {
        const result = await this.processItem(item);
        results.push(result);
        processedCount++;

        // Report processing rate periodically
        if (processedCount % 100 === 0) {
          const rate = processedCount / ((Date.now() - startTime) / 1000);
          this.statsd.gauge('processing.rate', rate);
        }
      } catch (error) {
        this.metrics.itemsProcessed.inc(
          {
            node_type: this.getNode().type,
            result: 'error',
          },
          1
        );
      } finally {
        itemTimer();
      }
    }

    return results;
  }
}

interface NodeMetrics {
  executionCounter: promClient.Counter<string>;
  executionDuration: promClient.Histogram<string>;
  itemsProcessed: promClient.Counter<string>;
  memoryUsage: promClient.Gauge<string>;
  activeExecutions: promClient.Gauge<string>;
}

Distributed Tracing

OpenTelemetry Integration

import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import * as api from '@opentelemetry/api';

export class TracedNode {
  private tracer: api.Tracer;

  constructor() {
    // Initialize OpenTelemetry
    const sdk = new NodeSDK({
      resource: new Resource({
        [SemanticResourceAttributes.SERVICE_NAME]: 'n8n-custom-node',
        [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
      }),
      instrumentations: [getNodeAutoInstrumentations()],
    });

    sdk.start();

    this.tracer = api.trace.getTracer('n8n-custom-node', '1.0.0');
  }

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    // Start root span
    return await this.tracer.startActiveSpan(
      'node.execute',
      async (span) => {
        try {
          // Add span attributes
          span.setAttributes({
            'node.type': this.getNode().type,
            'node.name': this.getNode().name,
            'workflow.id': this.getWorkflow().id,
            'execution.id': this.getExecutionId(),
          });

          // Extract parent context if available
          const parentContext = this.extractParentContext();
          if (parentContext) {
            span.addLink({ context: parentContext });
          }

          const result = await this.executeWithTracing(span);

          span.setStatus({ code: api.SpanStatusCode.OK });

          return [result];
        } catch (error) {
          span.recordException(error);
          span.setStatus({
            code: api.SpanStatusCode.ERROR,
            message: error.message,
          });
          throw error;
        } finally {
          span.end();
        }
      }
    );
  }

  private async executeWithTracing(
    parentSpan: api.Span
  ): Promise<INodeExecutionData[]> {
    const items = this.getInputData();
    const results: INodeExecutionData[] = [];

    for (let i = 0; i < items.length; i++) {
      await this.tracer.startActiveSpan(
        'item.process',
        { attributes: { 'item.index': i } },
        async (span) => {
          try {
            // Add baggage for cross-service propagation
            const baggage = api.propagation.getBaggage(api.context.active());
            baggage?.setEntry('item.id', { value: String(i) });

            const result = await this.processItemWithTracing(items[i], span);
            results.push(result);

            span.setStatus({ code: api.SpanStatusCode.OK });
          } catch (error) {
            span.recordException(error);
            span.setStatus({
              code: api.SpanStatusCode.ERROR,
              message: error.message,
            });
          } finally {
            span.end();
          }
        }
      );
    }

    return results;
  }

  private async processItemWithTracing(
    item: INodeExecutionData,
    parentSpan: api.Span
  ): Promise<INodeExecutionData> {
    // Create child spans for sub-operations
    const validationSpan = this.tracer.startSpan('validate', {
      parent: parentSpan,
    });

    try {
      await this.validateItem(item);
    } finally {
      validationSpan.end();
    }

    const transformSpan = this.tracer.startSpan('transform', {
      parent: parentSpan,
    });

    try {
      const transformed = await this.transformItem(item);
      return transformed;
    } finally {
      transformSpan.end();
    }
  }

  private extractParentContext(): api.Context | null {
    // Extract trace context from headers or metadata
    const traceHeader = this.getIncomingHeader('traceparent');

    if (traceHeader) {
      const context = api.propagation.extract(
        api.context.active(),
        { traceparent: traceHeader }
      );
      return context;
    }

    return null;
  }
}

Advanced Debugging Techniques

Remote Debugging Setup

export class DebuggableNode {
  private debugger: RemoteDebugger;

  constructor() {
    if (process.env.ENABLE_REMOTE_DEBUG) {
      this.debugger = new RemoteDebugger({
        port: parseInt(process.env.DEBUG_PORT || '9229'),
        breakOnStart: process.env.BREAK_ON_START === 'true',
      });
    }
  }

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    // Conditional breakpoint
    if (this.shouldBreak()) {
      debugger; // Will pause here if inspector is attached
    }

    // Debug context injection
    const debugContext = this.injectDebugContext();

    try {
      const result = await this.executeWithDebugging(debugContext);
      return [result];
    } catch (error) {
      // Capture debug snapshot on error
      await this.captureDebugSnapshot(error, debugContext);
      throw error;
    }
  }

  private shouldBreak(): boolean {
    // Break on specific conditions
    const breakConditions = process.env.DEBUG_CONDITIONS?.split(',') || [];

    return breakConditions.some(condition => {
      switch (condition) {
        case 'large_payload':
          return JSON.stringify(this.getInputData()).length > 10000;

        case 'specific_node':
          return this.getNode().name === process.env.DEBUG_NODE_NAME;

        case 'error_rate_high':
          return this.getErrorRate() > 0.1;

        default:
          return false;
      }
    });
  }

  private injectDebugContext(): DebugContext {
    return {
      sessionId: this.generateSessionId(),
      timestamp: Date.now(),
      inputs: this.captureInputs(),
      environment: this.captureEnvironment(),
      stack: new Error().stack,
    };
  }

  private async captureDebugSnapshot(
    error: any,
    context: DebugContext
  ): Promise<void> {
    const snapshot = {
      error: {
        message: error.message,
        stack: error.stack,
        code: error.code,
      },
      context,
      state: {
        memory: process.memoryUsage(),
        cpu: process.cpuUsage(),
        uptime: process.uptime(),
      },
      variables: this.captureLocalVariables(),
      timeline: this.getExecutionTimeline(),
    };

    // Save snapshot for analysis
    await this.saveDebugSnapshot(snapshot);

    // Send to debug service if configured
    if (process.env.DEBUG_SERVICE_URL) {
      await this.sendToDebugService(snapshot);
    }
  }

  private captureLocalVariables(): any {
    // Use V8 inspector to capture local scope
    const inspector = require('inspector');
    const session = new inspector.Session();
    session.connect();

    const variables: any = {};

    session.post('Runtime.evaluate', {
      expression: 'Object.keys(this)',
      includeCommandLineAPI: true,
    }, (err, result) => {
      if (!err && result.result) {
        variables.scope = result.result;
      }
    });

    session.disconnect();
    return variables;
  }

  private async saveDebugSnapshot(snapshot: any): Promise<void> {
    const fs = require('fs').promises;
    const path = require('path');

    const debugDir = process.env.DEBUG_OUTPUT_DIR || '/tmp/n8n-debug';
    await fs.mkdir(debugDir, { recursive: true });

    const filename = `debug-${snapshot.context.sessionId}-${Date.now()}.json`;
    const filepath = path.join(debugDir, filename);

    await fs.writeFile(filepath, JSON.stringify(snapshot, null, 2));
    console.log(`Debug snapshot saved to: ${filepath}`);
  }
}

class RemoteDebugger {
  constructor(private config: DebugConfig) {
    if (config.breakOnStart) {
      // Start inspector and wait for debugger
      const inspector = require('inspector');
      inspector.open(config.port, '0.0.0.0', true);
      console.log(`Debugger listening on port ${config.port}`);
      debugger; // Pause execution
    }
  }
}

Performance Profiling

export class ProfiledNode {
  private profiler: Profiler;

  constructor() {
    this.profiler = new Profiler({
      enabled: process.env.PROFILING_ENABLED === 'true',
      sampleRate: parseInt(process.env.PROFILING_SAMPLE_RATE || '100'),
    });
  }

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    // Start CPU profiling
    const cpuProfile = await this.profiler.startCPUProfile('node-execution');

    // Start heap profiling
    const heapSnapshot = this.profiler.takeHeapSnapshot('before');

    try {
      const result = await this.executeWithProfiling();

      // Stop profiling and analyze
      const profile = await this.profiler.stopCPUProfile(cpuProfile);
      await this.analyzeProfile(profile);

      return [result];
    } finally {
      // Take final heap snapshot
      const finalHeap = this.profiler.takeHeapSnapshot('after');
      await this.compareHeapSnapshots(heapSnapshot, finalHeap);
    }
  }

  private async analyzeProfile(profile: CPUProfile): Promise<void> {
    // Find hot spots
    const hotFunctions = profile.nodes
      .sort((a, b) => b.hitCount - a.hitCount)
      .slice(0, 10);

    this.logger.info('CPU Profile Hot Spots:', {
      totalTime: profile.duration,
      hotFunctions: hotFunctions.map(f => ({
        name: f.functionName,
        hits: f.hitCount,
        percentage: (f.hitCount / profile.samples.length) * 100,
      })),
    });

    // Check for performance issues
    const issues = this.detectPerformanceIssues(profile);
    if (issues.length > 0) {
      this.logger.warn('Performance issues detected:', issues);
    }
  }

  private detectPerformanceIssues(profile: CPUProfile): string[] {
    const issues: string[] = [];

    // Check for excessive GC
    const gcTime = profile.nodes
      .filter(n => n.functionName.includes('GC'))
      .reduce((sum, n) => sum + n.hitCount, 0);

    if (gcTime / profile.samples.length > 0.1) {
      issues.push('Excessive garbage collection (>10% of CPU time)');
    }

    // Check for blocking operations
    const blockingOps = profile.nodes.filter(n =>
      ['readFileSync', 'writeFileSync', 'execSync'].some(op =>
        n.functionName.includes(op)
      )
    );

    if (blockingOps.length > 0) {
      issues.push('Blocking I/O operations detected');
    }

    return issues;
  }
}

class Profiler {
  private v8Profiler: any;

  constructor(private config: ProfilerConfig) {
    if (config.enabled) {
      this.v8Profiler = require('v8-profiler-next');
      this.v8Profiler.setGenerateType(1); // Use profiler tree structure
    }
  }

  async startCPUProfile(title: string): Promise<string> {
    if (!this.config.enabled) return '';

    const id = `${title}-${Date.now()}`;
    this.v8Profiler.startProfiling(id, true);
    return id;
  }

  async stopCPUProfile(id: string): Promise<CPUProfile> {
    if (!this.config.enabled) return {} as CPUProfile;

    const profile = this.v8Profiler.stopProfiling(id);
    const result = await this.exportProfile(profile);
    profile.delete();
    return result;
  }

  takeHeapSnapshot(title: string): any {
    if (!this.config.enabled) return null;

    const snapshot = this.v8Profiler.takeSnapshot(title);
    return snapshot;
  }

  private async exportProfile(profile: any): Promise<CPUProfile> {
    return new Promise((resolve) => {
      profile.export((error: any, result: string) => {
        if (error) {
          console.error('Profile export failed:', error);
          resolve({} as CPUProfile);
        } else {
          resolve(JSON.parse(result));
        }
      });
    });
  }
}

Production Monitoring Dashboard

Real-time Monitoring

export class MonitoringDashboard {
  private metrics: RealTimeMetrics;
  private websocket: WebSocket;

  constructor() {
    this.metrics = new RealTimeMetrics();

    // Set up WebSocket for real-time updates
    if (process.env.MONITORING_WEBSOCKET_URL) {
      this.websocket = new WebSocket(process.env.MONITORING_WEBSOCKET_URL);
      this.startRealtimeMonitoring();
    }
  }

  private startRealtimeMonitoring(): void {
    setInterval(() => {
      const snapshot = {
        timestamp: new Date().toISOString(),
        node: {
          type: 'custom-node',
          version: '1.0.0',
        },
        performance: {
          cpu: process.cpuUsage(),
          memory: process.memoryUsage(),
          eventLoop: this.measureEventLoopLag(),
        },
        throughput: {
          requestsPerSecond: this.metrics.getRequestRate(),
          itemsPerSecond: this.metrics.getItemRate(),
          bytesPerSecond: this.metrics.getThroughput(),
        },
        errors: {
          rate: this.metrics.getErrorRate(),
          recent: this.metrics.getRecentErrors(),
        },
        latency: {
          p50: this.metrics.getPercentile(50),
          p95: this.metrics.getPercentile(95),
          p99: this.metrics.getPercentile(99),
        },
      };

      // Send to dashboard
      if (this.websocket.readyState === WebSocket.OPEN) {
        this.websocket.send(JSON.stringify(snapshot));
      }
    }, 1000); // Update every second
  }

  private measureEventLoopLag(): number {
    const start = process.hrtime.bigint();
    setImmediate(() => {
      const lag = Number(process.hrtime.bigint() - start) / 1000000;
      this.metrics.recordEventLoopLag(lag);
    });
    return this.metrics.getAverageEventLoopLag();
  }
}

class RealTimeMetrics {
  private requests: number[] = [];
  private items: number[] = [];
  private bytes: number[] = [];
  private errors: any[] = [];
  private latencies: number[] = [];
  private eventLoopLags: number[] = [];

  private windowSize = 60; // 60 second window

  recordRequest(): void {
    const now = Date.now();
    this.requests.push(now);
    this.cleanOldData();
  }

  recordItems(count: number): void {
    this.items.push(...Array(count).fill(Date.now()));
    this.cleanOldData();
  }

  recordBytes(bytes: number): void {
    this.bytes.push(bytes);
    this.cleanOldData();
  }

  recordError(error: any): void {
    this.errors.push({
      timestamp: Date.now(),
      error: error.message,
    });
    this.cleanOldData();
  }

  recordLatency(ms: number): void {
    this.latencies.push(ms);
    if (this.latencies.length > 1000) {
      this.latencies.shift();
    }
  }

  recordEventLoopLag(ms: number): void {
    this.eventLoopLags.push(ms);
    if (this.eventLoopLags.length > 100) {
      this.eventLoopLags.shift();
    }
  }

  getRequestRate(): number {
    const now = Date.now();
    const recent = this.requests.filter(t => now - t < 1000);
    return recent.length;
  }

  getItemRate(): number {
    const now = Date.now();
    const recent = this.items.filter(t => now - t < 1000);
    return recent.length;
  }

  getThroughput(): number {
    const recentBytes = this.bytes.slice(-10);
    return recentBytes.reduce((a, b) => a + b, 0) / recentBytes.length;
  }

  getErrorRate(): number {
    const now = Date.now();
    const recentErrors = this.errors.filter(e => now - e.timestamp < 60000);
    const recentRequests = this.requests.filter(t => now - t < 60000);
    return recentRequests.length > 0 ? recentErrors.length / recentRequests.length : 0;
  }

  getRecentErrors(): any[] {
    return this.errors.slice(-5);
  }

  getPercentile(p: number): number {
    if (this.latencies.length === 0) return 0;
    const sorted = [...this.latencies].sort((a, b) => a - b);
    const index = Math.ceil((p / 100) * sorted.length) - 1;
    return sorted[index];
  }

  getAverageEventLoopLag(): number {
    if (this.eventLoopLags.length === 0) return 0;
    return this.eventLoopLags.reduce((a, b) => a + b, 0) / this.eventLoopLags.length;
  }

  private cleanOldData(): void {
    const cutoff = Date.now() - (this.windowSize * 1000);
    this.requests = this.requests.filter(t => t > cutoff);
    this.items = this.items.filter(t => t > cutoff);
    this.errors = this.errors.filter(e => e.timestamp > cutoff);
  }
}

Best Practices Summary

Observability Checklist

  1. Structured Logging
    • Use JSON format for machine parsing
    • Include correlation IDs for tracing
    • Sanitize sensitive data
    • Set appropriate log levels
  2. Comprehensive Metrics
    • Track business metrics (items processed, success rate)
    • Monitor system metrics (CPU, memory, latency)
    • Use histograms for distributions
    • Export for Prometheus/Grafana
  3. Distributed Tracing
    • Implement OpenTelemetry
    • Propagate trace context
    • Create meaningful spans
    • Include relevant attributes
  4. Error Tracking
    • Capture full error context
    • Include stack traces in development
    • Group similar errors
    • Alert on error rate thresholds
  5. Performance Monitoring
    • Profile CPU and memory usage
    • Identify bottlenecks
    • Track event loop lag
    • Monitor garbage collection
  6. Debugging Support
    • Remote debugging capability
    • Debug snapshots on errors
    • Conditional breakpoints
    • Variable inspection

Next Steps

Complete your n8n developer journey with testing and CI/CD:

Testing & CI/CD Pipelines

Implement comprehensive testing and continuous deployment for n8n nodes