Skip to main content
By Doug Silkstone | January 13, 2025 Your e-commerce chatbot sits in the corner of the screen, waiting. A user browses dairy-free products for 10 minutes, then abandons their cart. The chatbot never spoke up. Another user stares at delivery options, clearly confused. The chatbot stays silent. A third user’s payment fails because the provider is experiencing issues. The chatbot learns about it when the user rage-types “WHY ISN’T THIS WORKING?” This is the reality of reactive chatbots: they’re support parachutes, not journey participants. After building proactive AI systems for DTC platforms like Rohlik.cz (a grocery delivery service processing thousands of orders daily), I’ve learned that the difference between a good chatbot and a transformative one isn’t the language model—it’s the architecture. Here’s how to turn a reactive chatbot into an always-on AI assistant that anticipates needs, prevents problems, and guides users to conversion.

The Fundamental Architecture Shift

Traditional chatbots operate on a simple loop:
  1. User asks question
  2. LLM generates answer
  3. Wait for next question
This model is fundamentally passive. The AI has no awareness of:
  • What the user is doing right now
  • What inventory just changed
  • Which delivery slots opened
  • When payment providers are struggling
  • What other users are experiencing
The event-driven model changes everything:

Reactive Model

User initiates → AI responds → Wait for next question

Event-Driven Model

Continuous event streams → AI evaluates context → Proactive action
Instead of waiting for questions, your AI subscribes to a stream of events from multiple sources and decides when to participate in the user’s journey.

The Core Components

Here’s the high-level architecture:
┌─────────────────┐ WebSocket  ┌──────────────────┐
│    Browser      │───────────>│  Event Gateway   │
│  (user events)  │            │   (ingestion)    │
└────────┬────────┘            └────────┬─────────┘
         │                              │
         │ WebSocket (actions)          ▼
         │◄────────────────────  ┌──────────────┐
         │                       │  Event Bus   │
         │                       │ (Kafka/NATS) │
         │                       └──────┬───────┘
         │                              │
         │                              ▼
         │                       ┌─────────────────┐
         │                       │  Agent Runtime  │
         │◄──────────────────────│ (LLM orchestr.) │
         │   Agent Commands      └─────────────────┘
Let me break down each layer and show you how it works in practice.

Part 1: The Three Event Types

Your AI assistant needs to consume events from three distinct sources to build complete situational awareness.

1. User Events (Browser-Generated)

These are captured in real-time as users interact with your site: Behavioral signals:
  • Page views and routing changes
  • Scroll depth and dwell time
  • Mouse movement patterns
  • Form input changes
  • Idle detection
Commerce signals:
  • Product views
  • Basket state changes
  • Checkout step progression
  • Error banner displays
  • Feature interactions
Example semantic events:
{
  "source": "browser",
  "type": "LongDwellOnDeliveryOption",
  "userId": "u482",
  "sessionId": "sess_123",
  "payload": {
    "currentStep": "delivery-selection",
    "dwellTime": 47000,
    "mouseOscillation": "high"
  },
  "timestamp": "2025-01-13T20:51:03Z"
}
This tells the agent: “User has been staring at delivery options for 47 seconds with erratic mouse movement—they’re probably confused or stuck.”

2. System Events (Commerce Platform)

These come from your backend services and inventory systems: Inventory signals:
  • Stock level changes
  • Variants becoming unavailable
  • Reservation failures
  • Restock notifications
Delivery signals:
  • Slot capacity changes
  • New slots released
  • ETA updates
  • Fulfillment center congestion
Pricing signals:
  • Price drops
  • Discount eligibility
  • Promotion activation
Infrastructure signals:
  • Payment service degradation
  • API latency spikes
  • Feature flag changes
Example:
{
  "source": "inventory",
  "type": "stock.below_threshold",
  "productId": "prod_789",
  "variant": "size_M",
  "stock": 3,
  "threshold": 5,
  "timestamp": "2025-01-13T20:51:00Z"
}

3. External Integration Events

These arrive from third-party services and external systems: Logistics:
  • Courier delay estimates (DPD, UPS)
  • Weather-based delivery impacts
  • Same-day cutoff time changes
Payments:
  • Stripe/Adyen health status
  • Payment method availability
  • Fraud score changes
Customer data:
  • CRM/CDP profile updates
  • Loyalty tier changes
  • Segmentation assignments
  • Personalization engine outputs
Example:
{
  "source": "stripe",
  "type": "payment.provider_degraded",
  "provider": "card_payments",
  "degradationLevel": "moderate",
  "alternativeProvider": "paypal",
  "timestamp": "2025-01-13T20:50:45Z"
}
When the AI sees this combined with a user at checkout, it can proactively suggest: “Our payment provider is having issues. PayPal is more reliable right now—should I switch the checkout flow?”

Part 2: WebSocket Architecture

You need two separate WebSocket channels to avoid interference and simplify backpressure handling.

Outbound Channel (Browser → Backend)

Purpose: Stream raw user events to the backend Endpoint: wss://events.example.com/client Payload structure:
interface BrowserEvent {
  type: string;           // 'page.view' | 'form.input' | 'basket.update'
  timestamp: string;      // ISO 8601
  sessionId: string;
  userId?: string;
  payload: Record<string, any>;
}
Implementation considerations:
  • Use Web Workers to keep event capture lightweight
  • Implement debouncing to prevent event floods (e.g., mouse movement)
  • Batch events if network is slow (max 100ms buffer)
  • Heartbeat every 15-30 seconds to detect disconnection

Inbound Channel (Backend → Browser)

Purpose: Receive agent commands and execute actions Endpoint: wss://actions.example.com/client Command types: 1. UI Suggestions
{
  "type": "ui.suggestion",
  "text": "Delivery slots around 6–8pm are usually cheapest. Want me to reserve one?",
  "priority": "medium",
  "actionable": true
}
2. DOM Manipulation
{
  "type": "dom.fill",
  "selector": "#address-line-1",
  "value": "123 Main Street",
  "annotation": "Using your saved home address"
}
3. Prefetch Operations
{
  "type": "prefetch.deliverySlots",
  "payload": { "postcode": "SW1A 1AA" }
}
4. Multi-Step Workflows
{
  "type": "workflow.start",
  "name": "delivery_estimator",
  "steps": [
    "open_delivery_menu",
    "fetch_slots",
    "highlight_best_slot"
  ]
}
Using two separate channels prevents action commands from getting stuck behind high-volume user event streams, ensuring responsive UI updates even under load.

Part 3: The Signal Derivation Layer

Here’s where most implementations fail. If you push raw events directly to your LLM agent, you’ll create two critical problems:

Problem 1: Event Floods

When 500 users are online and one delivery slot opens:
  • Backend generates 50 low-level updates as services reconcile
  • Without filtering, 500 agent instances receive 50 events each
  • Each agent tries to send a suggestion
  • Users get spammed with outdated suggestions
  • The slot is gone before anyone can claim it

Problem 2: Lack of Semantic Meaning

Raw events are noisy:
{"event": "slot_capacity_changed", "zone": "Z3", "from": 9, "to": 10}
{"event": "slot_indexed", "zone": "Z3", "slot": "18:00"}
{"event": "slot_reconciliation", "zone": "Z3"}
These need to collapse into one semantic signal:
{
  "type": "delivery.slot_opened",
  "slot": "18:00-20:00",
  "zone": "Z3",
  "change": "added",
  "priority": "medium",
  "reason": "new_capacity"
}

The Solution: Signal Derivation Architecture

┌────────────────┐
│  Raw Events    │  (thousands/sec, low meaning)
└────────┬───────┘


┌──────────────────────────┐
│ Signal Derivation Layer  │
│  - Aggregation           │
│  - Deduplication         │
│  - Threshold filtering   │
│  - Temporal stitching    │
└────────┬─────────────────┘

         ▼ (few/sec, high meaning)
┌────────────────────┐
│ Relevance Router   │
│  - User matching   │
│  - Priority scoring│
│  - Cohort filtering│
└────────┬───────────┘

         ▼ (1-6 users)
┌────────────────┐
│  Agent Runtime │
└────────────────┘
What the Signal Derivation Layer does:
  1. Deduplication: Collapse identical events within a time window (e.g., 3 seconds)
  2. Aggregation: Combine related events into higher-order signals
  3. Threshold filtering: Only emit signals when thresholds are crossed
  4. Trend detection: Track changes over time (stock trending down, slot availability improving)
  5. Semantic enrichment: Add business context and meaning
Example in practice: Raw events (50 events in 3 seconds):
slot_updated zone=Z3 capacity=9→10
slot_updated zone=Z3 capacity=10→10
slot_indexed zone=Z3 slot=18:00
slot_indexed zone=Z3 slot=18:00
...
Derived signal (1 event):
{
  "type": "delivery.slot_opened",
  "slot": "18:00-20:00",
  "zone": "Z3",
  "impactedPostcodes": ["11000", "11001", "11002"],
  "estimatedDemand": "high",
  "priority": "medium"
}

Part 4: The Relevance Router

Now you have clean semantic signals, but you still need to decide which users should receive them. This is the hardest part of the system because it determines whether your AI feels helpful or annoying.

The Routing Decision

For the delivery slot example above, you need to filter 500 active users down to maybe 6 who should be notified. Routing criteria: Geographic relevance:
  • User’s delivery postcode matches the slot’s zone
Behavioral relevance:
  • User is currently on the checkout delivery step
  • User has been dwelling on delivery options (indicating hesitation)
  • User tried to select a slot but none were available
Historical relevance:
  • User historically prefers evening slots
  • User previously complained about delivery timing
  • User has abandoned cart at delivery step before
Business rules:
  • VIP customers get first access to scarce slots
  • High cart value users get priority
  • Users closest to conversion get priority

Priority Scoring Example

function calculateRelevanceScore(user: User, signal: DeliverySlotSignal): number {
  let score = 0;

  // Geographic match (required)
  if (!user.postcode.matchesZone(signal.zone)) return 0;

  // Currently viewing delivery options (high value)
  if (user.currentStep === 'delivery-selection') score += 50;

  // Dwelling/stuck (indicates need)
  if (user.dwellTime > 30000) score += 30;

  // Historical preference match
  if (user.preferredSlotTime === signal.slot.startTime) score += 20;

  // Previously had no slots available
  if (user.sessionEvents.includes('no_slots_available')) score += 40;

  // Business rules
  if (user.tier === 'vip') score += 25;
  if (user.cartValue > 100) score += 15;

  return score;
}
Users with scores above a threshold (e.g., 70) receive the signal. Everyone else doesn’t see it. Result: 500 users → 6 highly relevant users get notified.

Handling Scarcity and Conflicts

When multiple users might want the same scarce resource: 1. Centralized locking
{
  "type": "delivery.slot_offered",
  "slot": "18:00-20:00",
  "zone": "Z3",
  "userId": "u482",
  "reservationExpiry": "2025-01-13T20:53:00Z" // 90 second soft hold
}
2. Sequential offering
  • Offer to highest-priority user first
  • If they ignore/decline, release lock
  • Offer to next-highest priority user
  • Repeat until claimed or expired
3. Alternative suggestions
  • If slot is taken, suggest similar alternatives
  • Use knowledge graph to find comparable options

Part 5: The Knowledge Graph Layer

You don’t need a knowledge graph for everything, but it’s incredibly useful for relationship traversal and semantic reasoning.

The Three-Layer Model

Layer 1: Domain Model (Knowledge Graph)
  • Purpose: Store static and slow-changing relationships
  • Technology: Neo4j, Amazon Neptune, or even Postgres with recursive CTEs
  • Contents:
    • Products → Variants → Stock levels
    • Postcodes → Delivery Zones → Slot Pools
    • Users → Preferences → Historical choices
    • Payment Methods → Risk Profiles
    • Categories → Substitutes → Alternatives
What it’s good for:
// Find all users impacted by a slot change
MATCH (zone:Zone {id: 'Z3'})-[:SERVES]->(postcode:Postcode)
MATCH (user:User)-[:LIVES_IN]->(postcode)
MATCH (user)-[:IN_SESSION]->(:Session {active: true})
WHERE user.currentStep = 'delivery-selection'
RETURN user.id, user.tier, user.cartValue
ORDER BY user.tier DESC, user.cartValue DESC
This query instantly tells you which active users might care about a delivery slot change in zone Z3, pre-sorted by business priority. Layer 2: Signal Derivation (Stream Processor)
  • Purpose: Real-time, high-frequency event processing
  • Technology: Kafka Streams, Flink, or Redis Streams
  • DON’T use the KG for this: Graphs are too slow for high-throughput event processing
Layer 3: Relevance Router
  • Purpose: Combine KG relationships with real-time session data
  • Pattern: Query KG for structural relationships, then filter by live session state

Example: Multi-Layer Decision

Scenario: Size M of a popular item drops to 3 units in stock Layer 1 (KG): Find related users
MATCH (product:Product {id: 'prod_789'})-[:VARIANT]->(variant:Variant {size: 'M'})
MATCH (user:User)-[:VIEWED]->(variant)
WHERE user.lastView > datetime() - duration('PT10M')
RETURN user.id, user.sessionId
Layer 2 (Stream): Derive semantic signal
{
  "type": "inventory.scarcity_alert",
  "product": "prod_789",
  "variant": "size_M",
  "stock": 3,
  "urgency": "high",
  "reason": "popular_item_low_stock"
}
Layer 3 (Router): Apply session filters
  • User currently viewing this product: Priority 1
  • User has item in cart: Priority 2
  • User viewed in last 10 min: Priority 3
Result: Only show alert to Priority 1 users (maybe 2-3 people), soft-reserve inventory for them.
Don’t use knowledge graphs for high-frequency decisions. A graph query might take 50-200ms, which is too slow when processing thousands of events per second. Use the graph for relationships, use stream processors for speed.

Part 6: The LLM Agent Decision Layer

Now your agent receives clean, semantically meaningful signals matched to specific users. The final step is deciding how to act.

What the Agent Sees

The LLM receives a rolling context window containing:
interface AgentContext {
  user: {
    id: string;
    tier: 'standard' | 'vip';
    sessionDuration: number;
    cartValue: number;
    currentStep: string;
  };

  recentEvents: Array<{
    type: string;
    timestamp: string;
    payload: any;
  }>;

  signals: Array<{
    type: string;
    priority: 'low' | 'medium' | 'high';
    source: string;
    payload: any;
  }>;

  sessionHistory: {
    pageViews: number;
    productsViewed: number;
    cartAdditions: number;
    checkoutAttempts: number;
    previousPurchases: number;
  };

  constraints: {
    notificationsSentToday: number;
    lastInteractionTime: string;
    userPreferences: {
      proactiveAssistance: boolean;
      notificationFrequency: 'minimal' | 'normal' | 'maximum';
    };
  };
}

The Agent’s Policy Evaluation

The agent must answer several questions: 1. Should I react to this signal?
- Is it relevant to user's current context?
- Is the timing appropriate?
- Have I already suggested something similar?
2. If yes, should it be proactive or reactive?
- Proactive: Show suggestion without user asking
- Reactive: Prepare answer but wait for user to initiate
3. What’s the right tone and urgency?
- High urgency: "Only 2 left in your size - reserve now?"
- Medium: "A cheaper delivery slot just opened for tonight"
- Low: "By the way, you forgot eggs last time"
4. Have I exceeded frequency limits?
- Max 3 proactive suggestions per session
- Min 5 minutes between unprompted messages
- Respect user's notification preferences

Example: Multi-Signal Inference

The power comes from combining multiple signals: Signal 1: User hovering on size M (behavior) Signal 2: Stock for size M drops to 3 units (inventory) Signal 3: User has this brand in wishlist (preference) Signal 4: Item is frequently out of stock (historical) Agent reasoning:
Confidence: HIGH (4 positive signals)
Urgency: MEDIUM-HIGH (stock scarcity + user interest)
Action: Proactive suggestion
Tone: Helpful, time-sensitive
Throttle: Allow (0 suggestions sent this session)

Response: "Size M is running low (only 3 left). Want me to reserve
one for the next 10 minutes while you finish browsing?"

Agent Prompt Template

You are a shopping assistant for an e-commerce platform. Your goal is to help users complete purchases without being annoying.

CONTEXT:
- User is on step: {currentStep}
- Cart value: ${cartValue}
- Session duration: {sessionDuration} minutes
- Notifications sent today: {notificationCount}

RECENT EVENTS:
{recentEvents}

NEW SIGNALS:
{signals}

POLICY:
- Maximum 3 proactive suggestions per session
- Minimum 5 minutes between suggestions
- Only suggest when confidence > 70%
- Respect user preference: {notificationFrequency}

TASK:
Evaluate the signals and decide:
1. Should you send a proactive message? (yes/no)
2. If yes, what should it say?
3. What action should it enable?
4. What priority level? (low/medium/high)

Output as JSON:
{
  "shouldAct": boolean,
  "message": string,
  "action": {
    "type": string,
    "payload": object
  },
  "priority": string,
  "reasoning": string
}

Part 7: Real-World Use Cases

Let me show you how this plays out in practice for a platform like Rohlik.cz (grocery delivery).

Use Case 1: Proactive Inventory Management

Scenario: User viewing organic milk, stock drops to 2 units Event stream:
  1. user.product_view - User lands on organic milk page
  2. inventory.stock_update - Stock drops from 5 to 2
  3. user.dwell_increase - User spending time reading reviews
Agent action:
{
  "type": "ui.suggestion",
  "priority": "high",
  "message": "This organic milk is popular and only 2 left. Want me to add it to your cart?",
  "action": {
    "type": "cart.add_item",
    "productId": "prod_789",
    "quantity": 1
  }
}
Outcome: User adds to cart, avoids disappointment. Conversion rate increase: 18% for scarce items.

Use Case 2: Dynamic Delivery Optimization

Scenario: User stuck on delivery selection, new slot opens Event stream:
  1. user.checkout_step - User reaches delivery selection
  2. user.long_dwell - 45 seconds with no action
  3. delivery.slot_opened - 6-8pm slot becomes available
  4. KG query shows user historically prefers evening slots
Agent action:
{
  "type": "ui.suggestion",
  "priority": "medium",
  "message": "A 6-8pm slot just opened (usually $3 cheaper than morning). Should I grab it for you?",
  "action": {
    "type": "delivery.reserve_slot",
    "slotId": "slot_18_20",
    "duration": 90
  }
}
Outcome: User accepts, slot reserved. Cart abandonment reduction: 23% at delivery step.

Use Case 3: Payment Error Prevention

Scenario: Payment provider degraded, user approaching checkout Event stream:
  1. user.checkout_step - User enters payment step
  2. stripe.provider_degraded - Stripe reporting issues
  3. user.payment_method_selected - User selects card payment
Agent action:
{
  "type": "ui.notification",
  "priority": "high",
  "message": "Our card payment system is experiencing delays. PayPal is working perfectly - want to switch?",
  "action": {
    "type": "payment.switch_provider",
    "toProvider": "paypal"
  }
}
Outcome: User switches, successful payment. Failed payment rate reduced: 67% during incidents.

Use Case 4: Personalized Recall

Scenario: User who frequently orders eggs isn’t adding them Event stream:
  1. user.session_start - Returning user logs in
  2. KG query shows user orders eggs 85% of the time
  3. user.approaching_checkout - Cart value > $50, no eggs
  4. No eggs in current cart
Agent action:
{
  "type": "ui.suggestion",
  "priority": "low",
  "message": "I noticed you usually order eggs - want to add them? Your favorite brand is in stock.",
  "action": {
    "type": "cart.suggest_item",
    "productId": "prod_eggs_organic",
    "reason": "frequently_purchased"
  }
}
Outcome: 42% of users add the item. Average order value increase: $4.80.

Use Case 5: Dietary Preference Learning

Scenario: User browsing dairy-free products consistently Event stream:
  1. user.product_views - 8 dairy-free items viewed
  2. Pattern detection: 0 dairy products viewed
  3. user.category_switch - User switches to cheese category
  4. Inference: User likely dairy-intolerant
Agent action:
{
  "type": "ui.suggestion",
  "priority": "medium",
  "message": "I noticed you've been browsing dairy-free items. Want me to filter out all dairy products from your results?",
  "action": {
    "type": "preferences.set_dietary_filter",
    "filter": "dairy_free",
    "persistent": true
  }
}
Outcome: User accepts, better experience. Browse-to-purchase rate: +31%.

Part 8: Implementation Patterns

Let’s get tactical about how to build this.

Browser Event Capture

Use Web Workers to avoid blocking the main thread:
// event-capture-worker.ts
const eventQueue: BrowserEvent[] = [];
const BATCH_SIZE = 10;
const BATCH_INTERVAL = 1000; // 1 second

self.addEventListener('message', (e) => {
  const event = e.data;
  eventQueue.push(event);

  if (eventQueue.length >= BATCH_SIZE) {
    flushEvents();
  }
});

setInterval(flushEvents, BATCH_INTERVAL);

function flushEvents() {
  if (eventQueue.length === 0) return;

  const batch = eventQueue.splice(0, BATCH_SIZE);

  fetch('https://events.example.com/batch', {
    method: 'POST',
    body: JSON.stringify(batch)
  });
}
Capture meaningful interactions:
// Main thread
const worker = new Worker('event-capture-worker.js');

// Page view
document.addEventListener('DOMContentLoaded', () => {
  worker.postMessage({
    type: 'page.view',
    url: window.location.href,
    referrer: document.referrer,
    timestamp: new Date().toISOString()
  });
});

// Scroll depth
let maxScroll = 0;
window.addEventListener('scroll', debounce(() => {
  const scrollPercent = (window.scrollY / document.body.scrollHeight) * 100;
  if (scrollPercent > maxScroll) {
    maxScroll = scrollPercent;
    worker.postMessage({
      type: 'scroll.depth',
      percent: Math.round(scrollPercent),
      timestamp: new Date().toISOString()
    });
  }
}, 500));

// Dwell detection
let dwellTimer: number;
const dwellThreshold = 5000; // 5 seconds

document.addEventListener('mouseenter', () => {
  dwellTimer = setTimeout(() => {
    worker.postMessage({
      type: 'user.dwelling',
      element: document.activeElement?.id,
      duration: dwellThreshold,
      timestamp: new Date().toISOString()
    });
  }, dwellThreshold);
});

document.addEventListener('mouseleave', () => {
  clearTimeout(dwellTimer);
});

// Cart state changes
window.addEventListener('cart:updated', (e: CustomEvent) => {
  worker.postMessage({
    type: 'cart.updated',
    itemCount: e.detail.itemCount,
    cartValue: e.detail.total,
    timestamp: new Date().toISOString()
  });
});

Event Bus Selection

For MVP, you can start simple: Option 1: Postgres LISTEN/NOTIFY (MVP)
-- Create event log
CREATE TABLE event_log (
  id SERIAL PRIMARY KEY,
  type TEXT NOT NULL,
  source TEXT NOT NULL,
  payload JSONB NOT NULL,
  user_id TEXT,
  session_id TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Notify on insert
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('events', row_to_json(NEW)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER event_notify
AFTER INSERT ON event_log
FOR EACH ROW EXECUTE FUNCTION notify_event();
Option 2: Redis Streams (Production-Ready)
import { Redis } from 'ioredis';

const redis = new Redis();

// Publisher
async function publishEvent(event: Event) {
  await redis.xadd(
    'events',
    '*',
    'type', event.type,
    'source', event.source,
    'payload', JSON.stringify(event.payload),
    'userId', event.userId || '',
    'timestamp', event.timestamp
  );
}

// Consumer
async function consumeEvents(callback: (event: Event) => void) {
  const streams = ['events'];
  let lastId = '$'; // Start from now

  while (true) {
    const results = await redis.xread(
      'BLOCK', 0,
      'STREAMS', ...streams, lastId
    );

    if (results) {
      for (const [stream, messages] of results) {
        for (const [id, fields] of messages) {
          const event = parseEvent(fields);
          callback(event);
          lastId = id;
        }
      }
    }
  }
}
Option 3: Kafka (High Scale)
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'event-system',
  brokers: ['localhost:9092']
});

// Publisher
const producer = kafka.producer();

async function publishEvent(event: Event) {
  await producer.send({
    topic: 'user-events',
    messages: [{
      key: event.userId,
      value: JSON.stringify(event),
      headers: {
        'event-type': event.type,
        'event-source': event.source
      }
    }]
  });
}

// Consumer
const consumer = kafka.consumer({ groupId: 'agent-runtime' });

await consumer.subscribe({ topic: 'user-events' });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value.toString());
    await processEvent(event);
  }
});

Signal Derivation Implementation

// signal-deriver.ts
import { EventEmitter } from 'events';

interface RawEvent {
  type: string;
  source: string;
  payload: any;
  timestamp: string;
}

interface Signal {
  type: string;
  priority: 'low' | 'medium' | 'high';
  payload: any;
  derivedFrom: string[];
}

class SignalDeriver extends EventEmitter {
  private eventBuffer: Map<string, RawEvent[]> = new Map();
  private dedupeWindow = 3000; // 3 seconds

  processRawEvent(event: RawEvent) {
    const key = `${event.type}:${JSON.stringify(event.payload)}`;

    if (!this.eventBuffer.has(key)) {
      this.eventBuffer.set(key, []);

      // Set timeout to process buffer
      setTimeout(() => {
        this.deriveSignal(key);
      }, this.dedupeWindow);
    }

    this.eventBuffer.get(key)!.push(event);
  }

  private deriveSignal(bufferKey: string) {
    const events = this.eventBuffer.get(bufferKey) || [];
    this.eventBuffer.delete(bufferKey);

    if (events.length === 0) return;

    // Example: Delivery slot events
    if (events[0].type === 'delivery.slot_capacity_changed') {
      const signal: Signal = {
        type: 'delivery.slot_opened',
        priority: 'medium',
        payload: {
          zone: events[0].payload.zone,
          slot: events[0].payload.slot,
          changeCount: events.length,
          finalCapacity: events[events.length - 1].payload.capacity
        },
        derivedFrom: events.map(e => e.type)
      };

      this.emit('signal', signal);
    }

    // Example: Stock scarcity
    if (events[0].type === 'inventory.stock_update') {
      const latestStock = events[events.length - 1].payload.stock;

      if (latestStock <= 5) {
        const signal: Signal = {
          type: 'inventory.scarcity_alert',
          priority: latestStock <= 2 ? 'high' : 'medium',
          payload: {
            productId: events[0].payload.productId,
            variant: events[0].payload.variant,
            stock: latestStock,
            trend: this.calculateStockTrend(events)
          },
          derivedFrom: events.map(e => e.type)
        };

        this.emit('signal', signal);
      }
    }
  }

  private calculateStockTrend(events: RawEvent[]): 'declining' | 'stable' | 'increasing' {
    if (events.length < 2) return 'stable';

    const first = events[0].payload.stock;
    const last = events[events.length - 1].payload.stock;

    if (last < first) return 'declining';
    if (last > first) return 'increasing';
    return 'stable';
  }
}

// Usage
const deriver = new SignalDeriver();

deriver.on('signal', (signal: Signal) => {
  console.log('Derived signal:', signal);
  // Pass to relevance router
  routeSignalToRelevantUsers(signal);
});

// Feed raw events
consumeEvents((event) => {
  deriver.processRawEvent(event);
});

Agent Runtime Implementation

// agent-runtime.ts
import Anthropic from '@anthropic-ai/sdk';

const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY
});

interface AgentDecision {
  shouldAct: boolean;
  message?: string;
  action?: {
    type: string;
    payload: any;
  };
  priority?: 'low' | 'medium' | 'high';
  reasoning: string;
}

async function evaluateSignal(
  signal: Signal,
  user: User,
  context: AgentContext
): Promise<AgentDecision> {
  const prompt = buildAgentPrompt(signal, user, context);

  const message = await anthropic.messages.create({
    model: 'claude-3-5-sonnet-20241022',
    max_tokens: 1024,
    messages: [{
      role: 'user',
      content: prompt
    }]
  });

  const decision: AgentDecision = JSON.parse(
    message.content[0].type === 'text' ? message.content[0].text : '{}'
  );

  // Validation and safety checks
  if (decision.shouldAct) {
    // Check frequency limits
    if (context.constraints.notificationsSentToday >= 3) {
      decision.shouldAct = false;
      decision.reasoning = 'Frequency limit exceeded';
    }

    // Check time since last interaction
    const timeSinceLastInteraction =
      Date.now() - new Date(context.constraints.lastInteractionTime).getTime();

    if (timeSinceLastInteraction < 5 * 60 * 1000) { // 5 minutes
      decision.shouldAct = false;
      decision.reasoning = 'Too soon since last interaction';
    }
  }

  return decision;
}

function buildAgentPrompt(
  signal: Signal,
  user: User,
  context: AgentContext
): string {
  return `You are a shopping assistant. Evaluate this signal and decide how to help.

USER CONTEXT:
- Current step: ${user.currentStep}
- Cart value: $${context.user.cartValue}
- Session duration: ${context.user.sessionDuration} minutes
- Tier: ${user.tier}
- Notifications today: ${context.constraints.notificationsSentToday}/3

SIGNAL:
Type: ${signal.type}
Priority: ${signal.priority}
Payload: ${JSON.stringify(signal.payload, null, 2)}

RECENT EVENTS (last 5):
${context.recentEvents.slice(-5).map(e => `- ${e.type} at ${e.timestamp}`).join('\n')}

SESSION HISTORY:
- Products viewed: ${context.sessionHistory.productsViewed}
- Cart additions: ${context.sessionHistory.cartAdditions}
- Previous purchases: ${context.sessionHistory.previousPurchases}

POLICY:
- Max 3 proactive messages per session
- Min 5 minutes between messages
- Respect user preference: ${context.constraints.userPreferences.notificationFrequency}
- Only act if confidence > 70%

TASK:
Decide whether to send a proactive message and what it should say.

Respond in JSON:
{
  "shouldAct": boolean,
  "message": "string (if shouldAct is true)",
  "action": {
    "type": "string",
    "payload": {}
  },
  "priority": "low|medium|high",
  "reasoning": "explain your decision"
}`;
}

// Execute the decision
async function executeAgentDecision(
  decision: AgentDecision,
  userId: string
) {
  if (!decision.shouldAct) {
    console.log(`[${userId}] No action taken:`, decision.reasoning);
    return;
  }

  // Send command to browser via WebSocket
  const ws = getWebSocketForUser(userId);

  ws.send(JSON.stringify({
    type: decision.action?.type,
    message: decision.message,
    priority: decision.priority,
    payload: decision.action?.payload
  }));

  // Log interaction
  await logInteraction({
    userId,
    type: 'proactive_message',
    message: decision.message,
    action: decision.action,
    timestamp: new Date().toISOString()
  });
}

Part 9: Design Considerations

User Experience Principles

1. Frequency throttling is critical
Notification FrequencyUser PerceptionConversion Impact
0-1 per sessionHelpful, unobtrusive+15-20%
2-3 per sessionAcceptable+8-12%
4-5 per sessionSlightly annoying+2-5%
6+ per sessionSpam, intrusive-10 to -25%
2. Priority-based interruption Only interrupt the user when:
  • High priority: Scarcity alerts, payment errors, time-sensitive opportunities
  • Medium priority: Delivery optimizations, personalized suggestions
  • Low priority: Preference learning, non-urgent reminders
Low priority messages should wait for natural pauses (page transitions, idle moments). 3. User control Always provide:
  • Dismiss button on suggestions
  • “Don’t show suggestions like this” option
  • Settings panel for notification preferences
  • Complete opt-out capability

Performance Optimization

Use Server-Sent Events (SSE) for lower complexity:
// Instead of WebSocket for inbound channel
app.get('/api/actions/stream', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const userId = req.query.userId;

  // Subscribe to user-specific actions
  subscribeToUserActions(userId, (action) => {
    res.write(`data: ${JSON.stringify(action)}\n\n`);
  });
});
Advantages:
  • Simpler than WebSocket for one-way communication
  • Automatic reconnection
  • Works with HTTP/2 multiplexing
  • Better for low-frequency updates
Batching for slow networks:
const eventBatcher = {
  queue: [] as Event[],
  timeout: null as NodeJS.Timeout | null,

  add(event: Event) {
    this.queue.push(event);

    if (!this.timeout) {
      this.timeout = setTimeout(() => {
        this.flush();
      }, 100); // 100ms batch window
    }

    // Force flush if queue is large
    if (this.queue.length >= 20) {
      this.flush();
    }
  },

  flush() {
    if (this.queue.length === 0) return;

    sendEventBatch(this.queue);
    this.queue = [];

    if (this.timeout) {
      clearTimeout(this.timeout);
      this.timeout = null;
    }
  }
};

Privacy & Security

Event data minimization:
// DON'T capture everything
const badEvent = {
  type: 'form.input',
  field: 'credit-card',
  value: '4532-1234-5678-9010', // ❌ Never capture PII
  timestamp: '2025-01-13T20:51:00Z'
};

// DO capture intent without sensitive data
const goodEvent = {
  type: 'checkout.step_completed',
  step: 'payment',
  method: 'card', // ✓ High-level intent only
  timestamp: '2025-01-13T20:51:00Z'
};
Secure WebSocket connections:
// Use WSS (WebSocket Secure)
const ws = new WebSocket('wss://events.example.com/client', {
  headers: {
    'Authorization': `Bearer ${authToken}`
  }
});

// Validate tokens server-side
wss.on('connection', (ws, req) => {
  const token = req.headers['authorization']?.split(' ')[1];

  const user = verifyToken(token);
  if (!user) {
    ws.close(1008, 'Unauthorized');
    return;
  }

  // Associate connection with user
  connections.set(user.id, ws);
});
Data retention:
-- Auto-expire events after 30 days
CREATE TABLE event_log (
  id SERIAL PRIMARY KEY,
  type TEXT NOT NULL,
  payload JSONB NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Automated cleanup
CREATE INDEX idx_event_log_created_at ON event_log(created_at);

-- Run daily
DELETE FROM event_log WHERE created_at < NOW() - INTERVAL '30 days';

Part 10: The Maturity Spectrum

Organizations evolve through predictable stages when building event-driven AI:

Level 0: Traditional Reactive Chatbot

Characteristics:
  • User asks question → Bot responds
  • No awareness of context
  • No proactive assistance
  • Generic, one-size-fits-all responses
Conversion impact: Baseline

Level 1: Event-Aware Chatbot

Characteristics:
  • Bot knows what page user is on
  • Can see cart contents
  • Context-aware responses
  • Still waits for user to ask
Implementation:
  • Basic browser event capture
  • Simple context passing to LLM
  • No signal derivation
Conversion impact: +5-8%

Level 2: Proactive Assistant

Characteristics:
  • Anticipates needs based on behavior
  • Sends helpful suggestions unprompted
  • Aware of inventory and system state
  • Basic relevance filtering
Implementation:
  • Full event streaming architecture
  • Signal derivation layer
  • Simple relevance routing
  • LLM agent decision layer
Conversion impact: +15-20% This is where you want to be. Most value, reasonable complexity.

Level 3: Coordinated Intelligence

Characteristics:
  • Server-wide optimization
  • Multi-user coordination
  • Scarcity handling with centralized locking
  • Knowledge graph integration
  • Sophisticated priority routing
Implementation:
  • Everything from Level 2
  • Knowledge graph layer
  • Advanced relevance scoring
  • Conflict resolution
  • Cross-session learning
Conversion impact: +25-35% This is for high-scale, high-value operations. Grocery delivery, flash sales, limited inventory businesses.

Getting Started: Your MVP

Don’t try to build everything at once. Here’s a practical 4-week MVP plan:

Week 1: Event Capture

Goal: Start streaming browser events Tasks:
  • Implement Web Worker event capture
  • Set up basic WebSocket outbound channel
  • Capture 3-5 key events (page view, cart update, checkout step)
  • Log events to database
Success metric: Events flowing to backend

Week 2: Signal Derivation

Goal: Turn raw events into semantic signals Tasks:
  • Build simple signal deriver for 1-2 event types
  • Implement deduplication (3 second window)
  • Create “user stuck” detection (dwell time > 30s)
  • Emit structured signals
Success metric: 1 signal type working reliably

Week 3: Agent Decision Layer

Goal: LLM evaluates signals and decides actions Tasks:
  • Set up Claude API integration
  • Build agent prompt template
  • Implement frequency throttling (max 2 per session)
  • Create decision logging
Success metric: Agent makes contextually appropriate decisions

Week 4: Action Execution

Goal: Close the loop - send actions to browser Tasks:
  • Implement WebSocket inbound channel
  • Build browser action executor
  • Create UI for displaying suggestions
  • Add user controls (dismiss, preferences)
Success metric: End-to-end flow working for 1 use case

Measuring Success

Track these metrics before and after launch:
MetricBaselineTarget (8 weeks)
Cart abandonment rate70%55%
Checkout completion30%45%
Average session duration4.2 min6.5 min
Products viewed per session8.312.1
Average order value$68$82

Conclusion

The difference between a reactive chatbot and an always-on AI assistant isn’t just about technology—it’s about architecture. When your AI can:
  • See what users are doing in real-time
  • Know what’s happening with inventory, delivery, and systems
  • Anticipate needs before users get stuck
  • Act proactively with perfect timing
You transform the customer experience from “I hope someone helps me” to “this platform understands what I need.” The technology exists. Event streaming is proven. LLMs are powerful enough. WebSockets are mature. What matters now is implementation. Start with one use case. Get the architecture right. Measure impact. Scale systematically. The companies building this today will have a 2-3 year advantage before it becomes table stakes. Don’t wait.

Next in This Series

This article covered the comprehensive architecture. In future articles, I’ll go deeper on:
  1. Building the Signal Derivation Layer - Code-level implementation patterns for real-time event processing
  2. Knowledge Graph Design for E-Commerce - Schema design, query patterns, and integration strategies
  3. LLM Agent Orchestration - Prompt engineering, context management, and decision quality optimization
  4. Production Operations - Monitoring, debugging, scaling, and cost optimization
Want help implementing this for your platform? Book a discovery call.