HFT Pipeline Architecture: Complete Guide

HFT Pipeline Architecture: Complete Guide

Date: 2024-12-18 Status: Production-Ready Design Target: <200ms execution (quote → profit in wallet)


Table of Contents

  1. Overview
  2. Quote Service Integration
  3. NATS 4-Stream Architecture
  4. Event Flow: Scanner → Planner → Executor
  5. TwoHopArbitrageEvent Design
  6. Scanner Implementation
  7. Planner Implementation (Future)
  8. Executor Implementation (Future)
  9. Deployment & Configuration
  10. Performance Targets
  11. Migration Checklist

Overview

Four-Stage Pipeline

QUOTE-SERVICE (Go) (Quote Generation - <10ms per quote)
    ↓ Queries pools, caches quotes, streams via gRPC/NATS
    ↓ Publishes: MARKET_DATA stream (optional NATS) + gRPC streaming
SCANNER (Fast Pre-Filter - <10ms)
    ↓ Filters noise, detects patterns
    ↓ Publishes: OPPORTUNITIES stream
PLANNER (Deep Validation - 50-100ms with RPC simulation)
    ↓ Validates profitability, chooses execution method
    ↓ Publishes: PLANNED stream
EXECUTOR (Dumb Submission - <20ms)
    ↓ Just signs and sends
    ↓ Publishes: EXECUTED stream

Key Principle: Quote-Service = data (cached), Scanner = detect (cheap), Planner = validate (expensive), Executor = execute (dumb)

Performance Goals

StageCurrentTargetAchieved
Scanner8ms<10ms
Planner-50-100ms⏳ Future
Executor-<20ms⏳ Future
Total-<200ms🎯 Goal

NATS 6-Stream Architecture (FlatBuffers)

Stream Overview

┌────────────────────────────────────────────────────────────┐
│                  NATS Server (Single Instance)              │
├────────────────────────────────────────────────────────────┤
│                                                             │
│  1. MARKET_DATA (Quote-Service → market data)               │
│     ├─ Throughput: 10,000 events/sec                        │
│     ├─ Retention: 1 hour                                    │
│     ├─ Storage: Memory (sub-1ms latency)                    │
│     ├─ Format: FlatBuffers (zero-copy)                      │
│     └─ Subjects: market.swap_route.*, market.price.*        │
│                                                             │
│  2. OPPORTUNITIES (Scanner → detected opportunities)        │
│     ├─ Throughput: 500 events/sec                           │
│     ├─ Retention: 24 hours                                  │
│     ├─ Storage: File (2-5ms latency)                        │
│     ├─ Format: FlatBuffers (zero-copy)                      │
│     └─ Subjects: opportunity.two_hop.*, opportunity.triangular.* │
│                                                             │
│  3. PLANNED (Planner → validated plans)                      │
│     ├─ Throughput: 50 events/sec                            │
│     ├─ Retention: 1 hour (ephemeral)                        │
│     ├─ Storage: File (2-5ms latency)                        │
│     ├─ Format: FlatBuffers (zero-copy)                      │
│     └─ Subjects: execution.planned.*, execution.rejected.*  │
│                                                             │
│  4. EXECUTED (Executor → execution results)                 │
│     ├─ Throughput: 50 events/sec                            │
│     ├─ Retention: 7 days (audit trail)                      │
│     ├─ Storage: File (5-10ms latency)                       │
│     ├─ Format: FlatBuffers (zero-copy)                      │
│     └─ Subjects: executed.completed.*, executed.failed.*    │
│                                                             │
│  5. METRICS (Performance metrics)                           │
│     ├─ Throughput: 1,000-5,000 events/sec                   │
│     ├─ Retention: 1 hour                                    │
│     ├─ Storage: Memory (sub-1ms latency)                    │
│     ├─ Format: FlatBuffers (zero-copy)                      │
│     └─ Subjects: metrics.latency.*, metrics.throughput.*    │
│                                                             │
│  6. SYSTEM (Lifecycle & health events)                      │
│     ├─ Throughput: 1-10 events/sec                          │
│     ├─ Retention: 30 days (audit trail)                     │
│     ├─ Storage: File (persistence critical)                 │
│     ├─ Format: FlatBuffers (zero-copy)                      │
│     └─ Subjects: system.lifecycle.*, system.health.*        │
│                                                             │
└────────────────────────────────────────────────────────────┘

FlatBuffers Performance Benefits

MetricJSONFlatBuffersImprovement
Encode Time5-10μs1-2μs5-10x faster
Decode Time8-15μs0.1-0.5μs20-150x faster
Message Size450-600 bytes300-400 bytes30% smaller
Zero-Copy Read❌ No✅ YesEliminates copies
Memory Allocation❌ High✅ MinimalReduces GC pressure

Expected Latency Reduction: 10-20ms per event (5-10% of 200ms budget)

Why 6 Separate Streams?

1. Different Throughput Rates

Quote-Service → 10,000 events/sec (quotes to MARKET_DATA)
                  ↓ (Scanner consumes via gRPC or NATS)
Scanner       →    500 events/sec (98% filtered by detectArbitrage)
                  ↓ (opportunities to OPPORTUNITIES)
Planner       →     50 events/sec (90% rejected after RPC simulation)
                  ↓ (validated plans to PLANNED)
Executor      →     50 events/sec (results to EXECUTED)
                  ↓ (metrics aggregation)
Metrics       →  1,000-5,000 events/sec (latency, throughput, PnL to METRICS)
                  ↓ (lifecycle events)
System        →      1-10 events/sec (start, shutdown, kill switch to SYSTEM)

Filtering efficiency: 99.5% (10k quotes → 50 executions/sec) Metrics throughput: High-frequency performance data (every trade + periodic) System throughput: Low-frequency operational events (lifecycle only)

2. Different Retention Policies

MARKET_DATA:     1 hour   (noisy raw data, delete fast)
OPPORTUNITIES:  24 hours  (debugging opportunity detection)
PLANNED:         1 hour   (execution plans, short-lived)
EXECUTED:        7 days   (audit trail, PnL reporting)
METRICS:         1 hour   (short-lived, aggregated to Prometheus)
SYSTEM:         30 days   (regulatory compliance, audit trail)

Benefit:

  • Save disk space: PLANNED/METRICS are ephemeral (execute once and done)
  • Long retention: EXECUTED (7 days) for PnL analysis, SYSTEM (30 days) for compliance
  • Operational clarity: System lifecycle events separate from trading events

3. Independent Replay

# Replay opportunities to test new strategy logic
nats stream replay OPPORTUNITIES --since "2024-12-17T00:00:00Z"

# Replay execution plans to test executor changes (dry-run mode)
nats stream replay PLANNED --since "2024-12-17T00:00:00Z"

# Replay execution results for PnL recalculation
nats stream replay EXECUTED --since "2024-12-17T00:00:00Z"

4. Complete Fault Isolation

If Scanner crashes:
├─ MARKET_DATA: ⚠️ No new data (expected)
├─ OPPORTUNITIES: ⚠️ No new opportunities (expected)
├─ PLANNED: ✅ Planner buffers old opportunities
└─ EXECUTED: ✅ Executor keeps processing

If Planner crashes:
├─ MARKET_DATA: ✅ Scanner unaffected
├─ OPPORTUNITIES: ✅ Buffer fills with new opportunities
├─ PLANNED: ⚠️ No new plans (expected)
└─ EXECUTED: ✅ Executor processes buffered plans

If Executor crashes:
├─ MARKET_DATA: ✅ Scanner unaffected
├─ OPPORTUNITIES: ✅ Planner unaffected
├─ PLANNED: ✅ Buffer fills with validated plans
└─ EXECUTED: ⚠️ No new results (expected)

Stream Configurations

Stream 1: MARKET_DATA

{
  name: 'MARKET_DATA',
  subjects: ['market.swap_route.>', 'market.price.>', 'market.pool.>'],
  retention: 'limits',
  max_age: 3600 * 1e9,         // 1 hour
  max_msgs: 100000,            // 100k messages
  max_bytes: 1073741824,       // 1 GB
  storage: 'memory',           // ⚡ In-memory for speed
  replicas: 1,
  discard: 'old',
  duplicate_window: 5 * 1e9,   // 5 second deduplication
}

Stream 2: OPPORTUNITIES

{
  name: 'OPPORTUNITIES',
  subjects: ['opportunity.two_hop.>', 'opportunity.triangular.>'],
  retention: 'limits',
  max_age: 86400 * 1e9,        // 24 hours
  max_msgs: 10000,             // 10k messages
  max_bytes: 104857600,        // 100 MB
  storage: 'file',             // 💾 Persistent for debugging
  replicas: 1,
  discard: 'old',
  duplicate_window: 10 * 1e9,  // 10 second deduplication
}

Stream 3: PLANNED

{
  name: 'PLANNED',
  subjects: ['execution.planned.>', 'execution.rejected.>'],
  retention: 'limits',
  max_age: 3600 * 1e9,         // 1 hour (short-lived)
  max_msgs: 1000,              // 1k messages
  max_bytes: 10485760,         // 10 MB
  storage: 'file',             // 💾 Persistent
  replicas: 1,
  discard: 'old',
  duplicate_window: 10 * 1e9,  // 10 second deduplication
}

Stream 4: EXECUTED

{
  name: 'EXECUTED',
  subjects: ['executed.started.>', 'executed.completed.>', 'executed.failed.>'],
  retention: 'limits',
  max_age: 604800 * 1e9,       // 7 days
  max_msgs: -1,                // Unlimited (up to max_bytes)
  max_bytes: 1073741824,       // 1 GB
  storage: 'file',             // 💾 Persistent for audit
  replicas: 1,
  discard: 'old',
  duplicate_window: 60 * 1e9,  // 60 second deduplication
}

Stream 5: METRICS

{
  name: 'METRICS',
  subjects: ['metrics.latency.>', 'metrics.throughput.>', 'metrics.pnl.>', 'metrics.strategy.>'],
  retention: 'limits',
  max_age: 3600 * 1e9,         // 1 hour
  max_msgs: 50000,             // 50k messages
  max_bytes: 104857600,        // 100 MB
  storage: 'memory',           // ⚡ In-memory for speed
  replicas: 1,
  discard: 'old',
  duplicate_window: 5 * 1e9,   // 5 second deduplication
}

Stream 6: SYSTEM

{
  name: 'SYSTEM',
  subjects: ['system.lifecycle.>', 'system.killswitch.>', 'system.health.>'],
  retention: 'limits',
  max_age: 2592000 * 1e9,      // 30 days
  max_msgs: 10000,             // 10k messages
  max_bytes: 52428800,         // 50 MB
  storage: 'file',             // 💾 Persistent for compliance
  replicas: 1,
  discard: 'old',
  duplicate_window: 60 * 1e9,  // 60 second deduplication
}

Subject Hierarchy

# MARKET_DATA stream
market.swap_route.<tokenIn>.<tokenOut>
market.price.<symbol>
market.pool.<dex>.<poolId>

# OPPORTUNITIES stream
opportunity.two_hop.detected.<tokenStart>.<tokenIntermediate>
opportunity.triangular.detected.<tokenA>.<tokenB>.<tokenC>

# PLANNED stream
execution.planned.<opportunityId>
execution.rejected.<opportunityId>.<reason>

# EXECUTED stream
executed.started.<opportunityId>
executed.completed.<opportunityId>.<status>
executed.failed.<opportunityId>.<error>

# METRICS stream
metrics.latency.<component>.<operation>
metrics.throughput.<component>.<metric_type>
metrics.pnl.<strategy>.<token_pair>
metrics.strategy.<strategy_name>.<performance_metric>

# SYSTEM stream
system.lifecycle.start.<environment>
system.lifecycle.shutdown.<reason>
system.killswitch.<trigger>
system.health.slot_drift.<severity>
system.health.stalled.<component>
system.health.connection.<connection_type>.<status>

Event Flow: Quote-Service → Scanner → Planner → Executor

Complete Pipeline Diagram

┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 0: QUOTE-SERVICE (Go)                                        │
│ ─────────────────────────────────────────────────────────────────── │
│ Input: On-chain pool data (RPC + cached)                           │
│                                                                     │
│ Processing (<10ms per quote):                                      │
│ • Query all pools for token pair                                   │
│ • Calculate optimal routing                                        │
│ • Cache quotes (30s refresh)                                       │
│                                                                     │
│ Publishes to NATS:                                                 │
│ • Stream: MARKET_DATA                                              │
│ • Subject: market.swap_route.<tokenIn>.<tokenOut>                  │
│ • Event: SwapRouteEvent                                            │
│                                                                     │
│ Also provides:                                                     │
│ • gRPC: QuoteService.StreamQuotes() - streaming quotes             │
│ • HTTP: GET /quote?input=<>&output=<>&amount=<>                    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                            │
                            │ NATS: market.swap_route.* (optional)
                            │ OR gRPC streaming (primary)
                            ↓
┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 1: SCANNER (Fast Pre-Filter)                                 │
│ ─────────────────────────────────────────────────────────────────── │
│ Subscribes to NATS:                                                │
│ • Stream: MARKET_DATA (optional, primary is gRPC)                  │
│   └─ Subjects: market.swap_route.>                                 │
│ • Stream: SYSTEM (for lifecycle events) ⚠️ CRITICAL                │
│   └─ Subjects: system.killswitch.>, system.lifecycle.shutdown.*    │
│                                                                     │
│ Input Options:                                                     │
│ • Option A: gRPC quote stream from quote-service (primary)         │
│ • Option B: NATS MARKET_DATA stream (alternative)                  │
│                                                                     │
│ Processing (<10ms):                                                │
│ • detectArbitrage() - Simple math, no RPC                          │
│ • Calculate rough profit estimates                                 │
│ • Build complete execution path (SwapHop[])                        │
│ • Confidence: 0.5-0.8 (rough, pre-filter only)                     │
│ • Handle KillSwitch: Stop scanning immediately on trigger          │
│ • Handle Shutdown: Graceful cleanup on system shutdown             │
│                                                                     │
│ Publishes to NATS:                                                 │
│ • Stream: OPPORTUNITIES                                            │
│   └─ Subject: opportunity.two_hop.detected.<tokenStart>.<tokenIntermediate> │
│   └─ Event: TwoHopArbitrageEvent (FlatBuffers)                     │
│ • Stream: METRICS                                                  │
│   └─ Subject: metrics.latency.scanner.*                            │
│   └─ Event: LatencyMetricEvent (FlatBuffers)                       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                            │
                            │ NATS: opportunity.two_hop.*
                            ↓
┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 2: PLANNER (Deep Validation / Strategy Framework)            │
│ ─────────────────────────────────────────────────────────────────── │
│ Subscribes to NATS:                                                │
│ • Stream: OPPORTUNITIES (opportunities to validate)                │
│   └─ Subjects: opportunity.>                                       │
│   └─ Events: TwoHopArbitrageEvent, TriangularArbitrageEvent (FlatBuffers) │
│ • Stream: MARKET_DATA (fresh quotes for simulation) ⚠️ CRITICAL    │
│   └─ Subjects: market.swap_route.>, market.price.>                 │
│   └─ Events: SwapRouteEvent, PriceUpdateEvent (FlatBuffers)        │
│ • Stream: SYSTEM (for lifecycle events) ⚠️ CRITICAL                │
│   └─ Subjects: system.killswitch.>, system.lifecycle.shutdown.*    │
│                                                                     │
│ Processing (50-100ms):                                             │
│ • Fetch fresh quotes from MARKET_DATA (if Scanner's quotes stale)  │
│ • RPC simulation with current pool state (expensive but necessary) │
│ • Deep profit validation                                           │
│ • Risk scoring                                                     │
│ • Choose execution method (Jito/TPU/RPC)                           │
│ • Confidence: 0.9+ (high accuracy after simulation)                │
│ • Handle KillSwitch: Stop planning + reject all pending            │
│ • Handle Shutdown: Finish current validation, graceful exit        │
│                                                                     │
│ Publishes to NATS:                                                 │
│ • Stream: PLANNED                                                   │
│   └─ Subject: execution.planned.<opportunityId>                    │
│   └─ Event: ExecutionPlanEvent (FlatBuffers)                       │
│ • Stream: PLANNED (rejections)                                      │
│   └─ Subject: execution.rejected.<opportunityId>.<reason>          │
│   └─ Event: ExecutionRejectedEvent (FlatBuffers)                   │
│ • Stream: METRICS                                                  │
│   └─ Subject: metrics.latency.planner.*                            │
│   └─ Event: LatencyMetricEvent (FlatBuffers)                       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                            │
                            │ NATS: execution.planned.*
                            ↓
┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 3: EXECUTOR (Dumb Submission / Executor Framework)           │
│ ─────────────────────────────────────────────────────────────────── │
│ Subscribes to NATS:                                                │
│ • Stream: PLANNED                                                   │
│   └─ Subjects: execution.planned.>                                 │
│   └─ Events: ExecutionPlanEvent (FlatBuffers)                      │
│ • Stream: SYSTEM (for lifecycle events) ⚠️ CRITICAL                │
│   └─ Subjects: system.killswitch.>, system.lifecycle.shutdown.*    │
│                                                                     │
│ Processing (<20ms):                                                │
│ • Build transaction from pre-computed path                         │
│ • Sign with wallet                                                 │
│ • Submit via Jito/TPU/RPC (as specified in plan)                   │
│ • NO validation, NO simulation (Planner already did this)          │
│ • Handle KillSwitch: Stop execution + cancel pending transactions  │
│ • Handle Shutdown: Finish in-flight txs, graceful exit             │
│                                                                     │
│ Publishes to NATS:                                                 │
│ • Stream: EXECUTED                                                 │
│   └─ Subject: executed.started.<opportunityId>                     │
│   └─ Event: ExecutionStartedEvent (FlatBuffers)                    │
│ • Stream: EXECUTED                                                 │
│   └─ Subject: executed.completed.<opportunityId>.<status>          │
│   └─ Event: ExecutionResultEvent (FlatBuffers)                     │
│ • Stream: EXECUTED                                                 │
│   └─ Subject: executed.failed.<opportunityId>.<error>              │
│   └─ Event: ExecutionFailedEvent (FlatBuffers)                     │
│ • Stream: METRICS                                                  │
│   └─ Subject: metrics.latency.executor.*, metrics.pnl.*            │
│   └─ Events: LatencyMetricEvent, PnLMetricEvent (FlatBuffers)      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                            │
                            │ NATS: executed.completed.*
                            ↓
                   PnL Service, Monitor, Analytics

Responsibility Matrix

ServicePublishes ToSubjectsEventsPurpose
Quote-ServiceMARKET_DATAmarket.swap_route., market.price., market.pool.*SwapRouteEvent, PriceUpdateEvent, PoolStateEventReal-time market data (quotes, prices, pool updates)
ScannerOPPORTUNITIESopportunity.two_hop., opportunity.triangular.TwoHopArbitrageEvent, TriangularArbitrageEventDetected opportunities (pre-validation)
PlannerPLANNEDexecution.planned., execution.rejected.ExecutionPlanEvent, ExecutionRejectedEventValidated plans (ready to execute)
ExecutorEXECUTEDexecuted.started., executed.completed., executed.failed.*ExecutionStartedEvent, ExecutionResultEvent, ExecutionFailedEventExecution results (for PnL, monitoring)

Consumption Matrix

ServiceConsumes FromSubjectsEventsPurpose
ScannerMARKET_DATA (optional)market.swap_route.>SwapRouteEventAlternative to gRPC streaming
ScannerSYSTEMsystem.killswitch.>, system.lifecycle.shutdown.*KillSwitchEvent, SystemShutdownEventStop scanning on kill switch or shutdown
PlannerOPPORTUNITIES + MARKET_DATAopportunity.>, market.swap_route.>, market.price.>TwoHopArbitrageEvent + SwapRouteEvent, PriceUpdateEventValidate opportunities with fresh market data
PlannerSYSTEMsystem.killswitch.>, system.lifecycle.shutdown.*KillSwitchEvent, SystemShutdownEventStop planning + reject pending on kill switch
ExecutorPLANNEDexecution.planned.>ExecutionPlanEventExecute validated plans
ExecutorSYSTEMsystem.killswitch.>, system.lifecycle.shutdown.*KillSwitchEvent, SystemShutdownEventCancel pending txs on kill switch
PnL ServiceEXECUTEDexecuted.completed.>ExecutionResultEventCalculate profit/loss from signatures
MonitorEXECUTED + SYSTEMexecuted.>, system.health.>All execution + health eventsAlert on failures, track success rate
AnalyticsEXECUTED + METRICSexecuted.>, metrics.>All execution + metric eventsHistorical analysis, strategy performance

Notes:

  • ALL frameworks (Scanner, Planner, Executor) MUST subscribe to SYSTEM stream for kill switch and graceful shutdown
  • Scanner typically uses gRPC streaming from quote-service for lowest latency (<1ms), but can alternatively consume from NATS MARKET_DATA stream (2-5ms latency) for replay/testing scenarios
  • Planner (Strategy Framework) subscribes to BOTH OPPORTUNITIES and MARKET_DATA streams to validate opportunities with fresh quotes
  • Executor Framework subscribes to PLANNED stream for execution plans and SYSTEM for safety controls

Distributed Tracing Flow

TraceId: 550e8400-e29b-41d4-a716-446655440000

Stage 1: quote-service (Go)
├─ Span: quote_received (0ms)
└─ Publishes: SwapRouteEvent (traceId propagated)

Stage 2: scanner-service (TS)
├─ Span: arbitrage_detected (8ms)
├─ Parent: quote_received
└─ Publishes: TwoHopArbitrageEvent (traceId propagated)

Stage 3: planner-service (TS)
├─ Span: arbitrage_validated (85ms)
├─ Parent: arbitrage_detected
└─ Publishes: ExecutionPlanEvent (traceId propagated)

Stage 4: executor-service (TS)
├─ Span: arbitrage_executed (18ms)
├─ Parent: arbitrage_validated
└─ Publishes: ExecutionResultEvent (traceId propagated)

TOTAL: 111ms (quote → profit in wallet)

TwoHopArbitrageEvent Design

Event Structure (Go)

// go/pkg/events/events.go

type TwoHopArbitrageEvent struct {
	Type       string `json:"type"`
	TraceId    string `json:"traceId"`    // UUID for distributed tracing
	Timestamp  int64  `json:"timestamp"`
	Slot       uint64 `json:"slot"`

	// Opportunity identification
	OpportunityId      string `json:"opportunityId"` // Unique ID for deduplication

	// Tokens involved
	TokenStart         string `json:"tokenStart"`        // Starting token (e.g., SOL)
	TokenIntermediate  string `json:"tokenIntermediate"` // Intermediate token (e.g., mSOL, USDC)

	// Execution path (critical for performance)
	Path               []SwapHop `json:"path"` // [A→B, B→A] with full execution details

	// Profit analysis
	AmountIn           string  `json:"amountIn"`           // Input amount (as string for u64)
	ExpectedAmountOut  string  `json:"expectedAmountOut"`  // Expected output after round-trip
	EstimatedProfitUsd float64 `json:"estimatedProfitUsd"` // Estimated profit in USD
	EstimatedProfitBps int32   `json:"estimatedProfitBps"` // Estimated profit in basis points
	NetProfitUsd       float64 `json:"netProfitUsd"`       // Profit after fees

	// Risk metrics
	TotalFees          string  `json:"totalFees"`          // Total fees (all hops)
	PriceImpactBps     int32   `json:"priceImpactBps"`     // Total price impact
	RequiredCapitalUsd float64 `json:"requiredCapitalUsd"` // Capital needed (for flash loan sizing)
	Confidence         float64 `json:"confidence"`         // 0.0 - 1.0 (scanner's confidence)

	// Liquidity context
	Hop1LiquidityUsd   float64 `json:"hop1LiquidityUsd"`   // First hop pool liquidity
	Hop2LiquidityUsd   float64 `json:"hop2LiquidityUsd"`   // Second hop pool liquidity

	// Timing
	ExpiresAt          int64   `json:"expiresAt"`          // Estimated expiration timestamp
	DetectedAtSlot     uint64  `json:"detectedAtSlot"`     // Slot when detected

	// Scanner context (for strategy optimization)
	ScannerName        string                 `json:"scannerName"`        // Which scanner detected this
	ScannerMetadata    map[string]interface{} `json:"scannerMetadata"`    // Scanner-specific metadata
}

type SwapHop struct {
	AmmKey     string `json:"ammKey"`     // Pool/AMM address
	Label      string `json:"label"`      // DEX name ("Raydium", "Orca", etc.)
	InputMint  string `json:"inputMint"`  // Input token mint address
	OutputMint string `json:"outputMint"` // Output token mint address
	InAmount   string `json:"inAmount"`   // Input amount as string
	OutAmount  string `json:"outAmount"`  // Output amount as string
	FeeAmount  string `json:"feeAmount"`  // Fee amount as string
	FeeMint    string `json:"feeMint"`    // Fee token mint address
}

Example Event: SOL → mSOL → SOL

{
  "type": "TwoHopArbitrage",
  "traceId": "550e8400-e29b-41d4-a716-446655440000",
  "opportunityId": "2hop_SOL_mSOL_12345678",
  "tokenStart": "So11111111111111111111111111111111111111112",
  "tokenIntermediate": "mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So",
  "path": [
    {
      "ammKey": "6a1CsrpeZubDjEJE9s1CMVheB6HWM5d7m1cj2jkhyXhj",
      "label": "raydium_amm",
      "inputMint": "So11111111111111111111111111111111111111112",
      "outputMint": "mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So",
      "inAmount": "1000000000",
      "outAmount": "950000000",
      "feeAmount": "2500",
      "feeMint": "So11111111111111111111111111111111111111112"
    },
    {
      "ammKey": "9tzZzEHsKnwFL1A3DyFJwj36KnZj3gZ7g4srWp9YTEoh",
      "label": "orca_whirlpool",
      "inputMint": "mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So",
      "outputMint": "So11111111111111111111111111111111111111112",
      "inAmount": "950000000",
      "outAmount": "1005000000",
      "feeAmount": "950000",
      "feeMint": "mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So"
    }
  ],
  "amountIn": "1000000000",
  "expectedAmountOut": "1005000000",
  "estimatedProfitUsd": 0.25,
  "estimatedProfitBps": 50,
  "netProfitUsd": 0.22,
  "totalFees": "3450",
  "priceImpactBps": 15,
  "requiredCapitalUsd": 50.0,
  "confidence": 0.75,
  "hop1LiquidityUsd": 150000.0,
  "hop2LiquidityUsd": 200000.0,
  "expiresAt": 1734512350000,
  "detectedAtSlot": 12345678,
  "scannerName": "arbitrage-quote-scanner",
  "scannerMetadata": {
    "source": "gRPC_quote_stream",
    "filterType": "two_hop_round_trip",
    "quoteCacheSize": 1247
  }
}

Scanner Implementation

ArbitrageQuoteScanner Architecture

// ts/apps/scanner-service/src/scanners/arbitrage-quote-scanner.ts

export class ArbitrageQuoteScanner extends SubscriptionScanner {
  private quoteCache: Map<string, QuoteStreamResponse[]> = new Map();
  private readonly CACHE_TTL_MS = 5000; // 5 second cache
  private readonly MIN_PROFIT_BPS = 30; // 0.3% minimum for pre-filter

  protected async process(quote: QuoteStreamResponse): Promise<MarketEvent[]> {
    const tracer = trace.getTracer('scanner-service');

    // Extract parent trace context from quote
    const parentContext = quote.traceId
      ? this.extractTraceContext(quote.traceId)
      : context.active();

    return await tracer.startActiveSpan(
      'arbitrage_detection',
      { kind: SpanKind.INTERNAL },
      parentContext,
      async (span) => {
        try {
          // STAGE 1: Update cache
          this.updateQuoteCache(quote);

          // STAGE 2: Fast pre-filter
          const candidates = await this.detectArbitrage(quote);

          span.setAttribute('candidates.found', candidates.length);

          if (candidates.length === 0) {
            span.setStatus({ code: SpanStatusCode.OK, message: 'No opportunities' });
            return [];
          }

          // STAGE 3: Convert to TwoHopArbitrageEvent
          const events = candidates.map(candidate =>
            this.toTwoHopArbitrageEvent(candidate, span.spanContext().traceId)
          );

          span.setAttribute('events.published', events.length);
          span.setStatus({ code: SpanStatusCode.OK });

          return events;
        } catch (error) {
          span.recordException(error as Error);
          span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
          return [];
        } finally {
          span.end();
        }
      }
    );
  }
}

Fast Pre-Filter Logic

/**
 * FAST PRE-FILTER: Detect potential arbitrage opportunities
 *
 * CONSTRAINTS:
 * - No RPC calls (fast)
 * - Use cached data only
 * - Simple math only
 * - High false positive rate is OK (strategy validates)
 */
private async detectArbitrage(quote: QuoteStreamResponse): Promise<ArbitrageCandidate[]> {
  const candidates: ArbitrageCandidate[] = [];

  // Get cached reverse quotes: outputMint → inputMint
  const reverseQuotes = this.getCachedReverseQuotes(
    quote.outputMint,
    quote.inputMint
  );

  if (reverseQuotes.length === 0) return [];

  for (const reverseQuote of reverseQuotes) {
    // Check freshness
    if (Date.now() - reverseQuote.timestamp > this.CACHE_TTL_MS) {
      continue;
    }

    // Calculate round-trip profit
    const amountIn = BigInt(quote.inAmount);
    const finalAmount = BigInt(reverseQuote.outAmount);

    if (finalAmount <= amountIn) continue;

    // Profit in basis points
    const profitBps = Number((finalAmount - amountIn) * 10000n / amountIn);

    // Pre-filter threshold
    if (profitBps < this.MIN_PROFIT_BPS) continue;

    // Calculate rough confidence (no simulation)
    const confidence = this.calculatePreFilterConfidence(
      quote,
      reverseQuote,
      profitBps
    );

    candidates.push({
      tokenStart: quote.inputMint,
      tokenIntermediate: quote.outputMint,
      forwardQuote: quote,
      reverseQuote: reverseQuote,
      profitBps,
      confidence,
    });
  }

  // Sort by profit, return top 5
  return candidates.sort((a, b) => b.profitBps - a.profitBps).slice(0, 5);
}

Pre-Filter Confidence Calculation

/**
 * Calculate pre-filter confidence (no RPC simulation)
 *
 * FACTORS:
 * - Profit magnitude (higher = better)
 * - Pool liquidity (higher = better)
 * - Price impact (lower = better)
 * - Quote freshness (newer = better)
 */
private calculatePreFilterConfidence(
  forwardQuote: QuoteStreamResponse,
  reverseQuote: QuoteStreamResponse,
  profitBps: number
): number {
  let confidence = 0.4; // Base confidence (conservative)

  // Factor 1: Profit magnitude
  if (profitBps > 50) confidence += 0.1;   // >0.5%
  if (profitBps > 100) confidence += 0.1;  // >1.0%
  if (profitBps > 200) confidence += 0.1;  // >2.0%

  // Factor 2: Liquidity
  const minLiquidity = Math.min(
    forwardQuote.liquidityUsd || 0,
    reverseQuote.liquidityUsd || 0
  );
  if (minLiquidity > 50000) confidence += 0.05;   // >$50k
  if (minLiquidity > 100000) confidence += 0.1;   // >$100k
  if (minLiquidity > 500000) confidence += 0.1;   // >$500k

  // Factor 3: Price impact
  const totalSlippage = forwardQuote.slippageBps + reverseQuote.slippageBps;
  if (totalSlippage < 50) confidence += 0.1;   // <0.5%
  if (totalSlippage < 20) confidence += 0.05;  // <0.2%

  // Factor 4: Freshness
  const maxAge = Math.max(
    Date.now() - forwardQuote.timestamp,
    Date.now() - reverseQuote.timestamp
  );
  if (maxAge < 1000) confidence += 0.05;  // <1s old

  return Math.min(confidence, 1.0);
}

Quote Caching

/**
 * Update quote cache for reverse lookup
 */
private updateQuoteCache(quote: QuoteStreamResponse): void {
  const key = `${quote.inputMint}:${quote.outputMint}`;

  if (!this.quoteCache.has(key)) {
    this.quoteCache.set(key, []);
  }

  const quotes = this.quoteCache.get(key)!;
  quotes.push(quote);

  // Keep only recent quotes
  const cutoff = Date.now() - this.CACHE_TTL_MS;
  this.quoteCache.set(
    key,
    quotes.filter(q => q.timestamp > cutoff)
  );

  // Cleanup old cache entries (1% chance per quote)
  if (Math.random() < 0.01) {
    this.cleanupCache();
  }
}

/**
 * Get cached reverse quotes: outputMint → inputMint
 */
private getCachedReverseQuotes(
  outputMint: string,
  inputMint: string
): QuoteStreamResponse[] {
  const key = `${outputMint}:${inputMint}`;
  return this.quoteCache.get(key) || [];
}

Planner Implementation (Future)

Why Planner Needs MARKET_DATA Access

Problem: Scanner uses 5-second cached quotes for fast pre-filtering. By the time Planner validates an opportunity (50-100ms later), those quotes may be stale.

Solution: Planner subscribes to MARKET_DATA stream for fresh quotes during RPC simulation.

// Planner validation flow
async validateOpportunity(arb: TwoHopArbitrageEvent): Promise<ExecutionPlanEvent | null> {
  // STEP 1: Check if Scanner's quotes are still fresh
  const quoteAge = Date.now() - arb.timestamp;

  if (quoteAge > 2000) {
    // STEP 2: Quotes are stale (>2s old), fetch fresh quotes from MARKET_DATA
    const freshQuotes = await this.fetchFreshQuotesFromMarketData(
      arb.tokenStart,
      arb.tokenIntermediate
    );

    // STEP 3: Recalculate profit with fresh quotes
    const updatedProfit = this.calculateProfit(freshQuotes);

    if (updatedProfit < this.MIN_PROFIT_THRESHOLD) {
      return null; // Opportunity expired
    }
  }

  // STEP 4: RPC simulation with current pool state
  const simulation = await this.simulateArbitrage(arb);

  // STEP 5: Create execution plan if profitable
  if (simulation.success && simulation.netProfit > 0.01) {
    return this.createExecutionPlan(arb, simulation);
  }

  return null;
}

Benefits:

  • Accuracy: Planner validates with fresh data, not stale Scanner cache
  • Flexibility: Can re-quote specific pairs without waiting for Scanner
  • Performance: Only fetches fresh quotes when needed (if stale)
  • Reliability: Reduces false positives from price movement between detection and execution

Trade-off: Planner has slightly higher latency (50-100ms) but much higher accuracy (90%+ vs Scanner’s 65%)

ExecutionPlanEvent Structure

// ts/packages/market-events/src/events.ts

export interface ExecutionPlanEvent {
  type: 'ExecutionPlan';
  traceId: string; // Propagated from TwoHopArbitrageEvent
  timestamp: number;
  slot: number;

  // Reference to source opportunity
  opportunityId: string;
  sourceEvent: 'TwoHopArbitrage' | 'TriangularArbitrage';

  // Execution details
  executionMethod: 'jito_bundle' | 'tpu_direct' | 'rpc_fallback' | 'parallel_racing';
  path: SwapHop[]; // May be modified by strategy

  // Validation results
  simulationResult: {
    success: boolean;
    netProfit: number;
    gasUsed: number;
    logs: string[];
  };

  // Execution parameters
  jitoTip?: string;        // If using Jito
  computeBudget: number;   // Compute units
  priorityFee: string;     // Priority fee (lamports)

  // Risk assessment (after simulation)
  confidence: number;      // Updated confidence after simulation
  riskScore: number;       // 0.0 - 1.0 (lower is safer)

  // Strategy context
  strategyName: string;
  strategyMetadata: Record<string, unknown>;

  // Timing
  validUntil: number;      // Deadline for execution
}

Planner Logic (Conceptual)

// ts/apps/planner-service/src/strategies/two-hop-arbitrage-strategy.ts

export class TwoHopArbitrageStrategy implements TradingStrategy {
  async onMarketEvent(event: MarketEvent): Promise<ExecutionPlanEvent[]> {
    if (event.type !== 'TwoHopArbitrage') return [];

    const arb = event as TwoHopArbitrageEvent;

    // STAGE 1: Pre-validation
    if (arb.confidence < 0.5) return []; // Scanner wasn't confident
    if (arb.estimatedProfitBps < 50) return []; // <0.5% profit

    // STAGE 2: RPC Simulation (expensive!)
    const simulation = await this.simulateArbitrage(arb);
    if (!simulation.success) return [];
    if (simulation.netProfit < 0.01) return []; // <0.01 SOL profit

    // STAGE 3: Choose execution method
    const executionMethod = this.chooseExecutionMethod(arb, simulation);

    // STAGE 4: Create execution plan
    return [this.createExecutionPlan(arb, simulation, executionMethod)];
  }

  private chooseExecutionMethod(
    arb: TwoHopArbitrageEvent,
    simulation: SimulationResult
  ): ExecutionMethod {
    // High-value: Use Jito for MEV protection
    if (simulation.netProfit > 0.1) return 'jito_bundle';

    // Medium-value: Try TPU direct (faster, no tip)
    if (simulation.netProfit > 0.03) return 'tpu_direct';

    // Low-value: Use RPC (cheapest)
    return 'rpc_fallback';
  }
}

Executor Implementation (Future)

ExecutionResultEvent Structure

export interface ExecutionResultEvent {
  type: 'ExecutionResult';
  traceId: string;
  opportunityId: string;
  signature: string;        // On-chain transaction signature
  actualProfit: number;     // Real profit from on-chain data
  actualProfitUsd: number;
  gasUsed: number;
  blockTime: number;
  executionMethod: string;
  status: 'success' | 'partial_fill' | 'failed';
  timestamp: number;
}

Executor Logic (Conceptual)

// ts/apps/executor-service/src/executor.ts

export class JitoExecutor {
  async execute(plan: ExecutionPlanEvent): Promise<ExecutionResultEvent> {
    // STAGE 1: Build transaction from pre-computed path
    const tx = await this.buildTransaction(plan.path);

    // STAGE 2: Sign with wallet
    tx.sign([this.wallet]);

    // STAGE 3: Submit via Jito
    const signature = await this.submitViaJito(tx, plan.jitoTip);

    // STAGE 4: Wait for confirmation
    await this.connection.confirmTransaction(signature);

    // STAGE 5: Calculate actual profit from on-chain data
    const txData = await this.connection.getTransaction(signature);
    const actualProfit = this.calculateProfitFromTx(txData);

    return {
      type: 'ExecutionResult',
      traceId: plan.traceId,
      opportunityId: plan.opportunityId,
      signature,
      actualProfit,
      status: 'success',
      timestamp: Date.now(),
    };
  }
}

Deployment & Configuration

Docker Compose Setup

# deployment/docker/docker-compose.yml

services:
  nats:
    image: nats:2.10-alpine
    ports:
      - "4222:4222"   # Client connections
      - "8222:8222"   # HTTP monitoring
      - "6222:6222"   # Cluster routing
    command:
      - "--jetstream"
      - "--store_dir=/data"
      - "--max_file_store=10GB"
      - "--max_mem_store=2GB"
      - "--http_port=8222"
    volumes:
      - nats-data:/data
    healthcheck:
      test: ["CMD", "nats", "server", "check", "jetstream"]
      interval: 10s
      timeout: 5s
      retries: 3

  nats-setup:
    image: natsio/nats-box:latest
    depends_on:
      nats:
        condition: service_healthy
    volumes:
      - ./nats-init.sh:/init.sh
    command: ["/init.sh"]

volumes:
  nats-data:

NATS Initialization Script

#!/bin/bash
# deployment/docker/nats-init.sh

set -e

echo "Creating MARKET_DATA stream..."
nats stream add MARKET_DATA \
  --subjects "market.>" \
  --retention limits \
  --max-age 1h \
  --max-msgs 100000 \
  --max-bytes 1GB \
  --storage memory \
  --replicas 1 \
  --discard old \
  --dupe-window 5s

echo "Creating OPPORTUNITIES stream..."
nats stream add OPPORTUNITIES \
  --subjects "opportunity.>" \
  --retention limits \
  --max-age 24h \
  --max-msgs 10000 \
  --max-bytes 100MB \
  --storage file \
  --replicas 1 \
  --discard old \
  --dupe-window 10s

echo "Creating PLANNED stream..."
nats stream add PLANNED \
  --subjects "execution.>" \
  --retention limits \
  --max-age 1h \
  --max-msgs 1000 \
  --max-bytes 10MB \
  --storage file \
  --replicas 1 \
  --discard old \
  --dupe-window 10s

echo "Creating EXECUTED stream..."
nats stream add EXECUTED \
  --subjects "executed.>" \
  --retention limits \
  --max-age 7d \
  --max-msgs -1 \
  --max-bytes 1GB \
  --storage file \
  --replicas 1 \
  --discard old \
  --dupe-window 60s

echo "Creating METRICS stream..."
nats stream add METRICS \
  --subjects "metrics.>" \
  --retention limits \
  --max-age 1h \
  --max-msgs 50000 \
  --max-bytes 100MB \
  --storage memory \
  --replicas 1 \
  --discard old \
  --dupe-window 5s

echo "Creating SYSTEM stream..."
nats stream add SYSTEM \
  --subjects "system.>" \
  --retention limits \
  --max-age 30d \
  --max-msgs 10000 \
  --max-bytes 50MB \
  --storage file \
  --replicas 1 \
  --discard old \
  --dupe-window 60s

echo "JetStream setup complete! Created 6 streams."
nats stream list

Performance Targets

Latency Breakdown

StageTargetCritical PathNotes
Scanner<10ms✅ YesFast pre-filter only
NATS (OPPORTUNITIES)2-5ms✅ YesFile storage latency
Planner50-100ms✅ YesRPC simulation (unavoidable)
NATS (PLANNED)2-5ms✅ YesFile storage latency
Executor<20ms✅ YesSign + submit
NATS (EXECUTED)5-10ms❌ NoAfter trade, not critical
TOTAL<200ms-Scanner → Executor

Throughput Targets

StreamEvents/secPeakBuffer Size
MARKET_DATA10,00020,000100k messages
OPPORTUNITIES5001,00010k messages
PLANNED501001k messages
EXECUTED5010010k messages
METRICS1,000-5,00010,00050k messages
SYSTEM1-105010k messages

Success Metrics

Scanner:
├─ False positive rate: <35% (acceptable)
├─ Latency: <10ms (critical)
└─ Throughput: 500 opportunities/sec

Planner:
├─ Filtering efficiency: >80% (reject bad opportunities)
├─ Simulation accuracy: >90% (profit estimate ± 10%)
└─ Latency: 50-100ms (RPC-dependent)

Executor:
├─ Success rate: >80% (with proper validation)
├─ Jito bundle landing rate: >95%
└─ Latency: <20ms (sign + submit only)

End-to-End:
├─ Total latency: <200ms (quote → profit)
├─ Profitable trades: >60% (after all filtering)
└─ Average profit: >0.05 SOL per trade

Framework SYSTEM Stream Integration

Why ALL Frameworks Must Subscribe to SYSTEM

Critical Safety Requirement: All trading components (Scanner, Planner, Executor) MUST subscribe to the SYSTEM stream for:

  1. Kill Switch Events: Immediately stop all trading activity when triggered
  2. Graceful Shutdown: Clean up resources and finish in-flight operations
  3. System Health Monitoring: React to slot drift, connection failures, stalledcomponents

Framework Responsibilities

Scanner Framework (go/pkg/scanner)

MUST Subscribe To:

  • system.killswitch.> - Stop scanning immediately on any kill switch trigger
  • system.lifecycle.shutdown.* - Graceful shutdown on system stop

MUST Implement:

func (s *Scanner) handleKillSwitch(event *KillSwitchEvent) {
    s.logger.Error("Kill switch triggered", "trigger", event.Trigger)
    s.stopScanning()  // Stop immediately
    s.publishMetric("scanner.stopped", "killswitch")
}

func (s *Scanner) handleShutdown(event *SystemShutdownEvent) {
    s.logger.Info("Graceful shutdown initiated")
    s.stopScanning()  // Stop gracefully
    s.cleanup()       // Clean up resources
}

Strategy Framework (Planner) (ts/packages/strategy-framework)

MUST Subscribe To:

  • system.killswitch.> - Stop planning + reject all pending opportunities
  • system.lifecycle.shutdown.* - Finish current validation, then graceful exit
  • market.swap_route.>, market.price.> - Fresh quotes for simulation ⚠️ CRITICAL

MUST Implement:

async handleKillSwitch(event: KillSwitchEvent): Promise<void> {
  this.logger.error('Kill switch triggered', { trigger: event.trigger });
  this.stopPlanning();  // Stop immediately
  await this.rejectPendingOpportunities();  // Reject all buffered
  await this.publishMetric('planner.stopped', 'killswitch');
}

async handleShutdown(event: SystemShutdownEvent): Promise<void> {
  this.logger.info('Graceful shutdown initiated');
  await this.finishCurrentValidation();  // Finish in-flight
  this.stopPlanning();
  await this.cleanup();
}

Executor Framework (ts/packages/executor-framework)

MUST Subscribe To:

  • system.killswitch.> - Cancel pending transactions immediately
  • system.lifecycle.shutdown.* - Finish in-flight txs, then graceful exit

MUST Implement:

async handleKillSwitch(event: KillSwitchEvent): Promise<void> {
  this.logger.error('Kill switch triggered', { trigger: event.trigger });
  await this.cancelPendingTransactions();  // Cancel all pending
  this.stopExecution();  // Stop immediately
  await this.publishMetric('executor.stopped', 'killswitch');
}

async handleShutdown(event: SystemShutdownEvent): Promise<void> {
  this.logger.info('Graceful shutdown initiated');
  await this.finishInFlightTransactions();  // Wait for pending txs
  this.stopExecution();
  await this.cleanup();
}

SYSTEM Stream Event Flow

KILL SWITCH TRIGGERED:
┌─────────────────────────────────────────────────────────┐
│ Monitor detects critical condition (excessive loss)     │
│   ↓ Publishes: system.killswitch.excessive_loss        │
├─────────────────────────────────────────────────────────┤
│ Scanner receives KillSwitchEvent                        │
│   → Stop scanning immediately                           │
│   → No new opportunities published                      │
├─────────────────────────────────────────────────────────┤
│ Planner receives KillSwitchEvent                        │
│   → Stop planning immediately                           │
│   → Reject all buffered opportunities                   │
│   → Publish ExecutionRejectedEvent (reason: killswitch) │
├─────────────────────────────────────────────────────────┤
│ Executor receives KillSwitchEvent                       │
│   → Cancel all pending transactions                     │
│   → Stop execution immediately                          │
│   → Publish ExecutionFailedEvent (error: killswitch)    │
└─────────────────────────────────────────────────────────┘
Result: System stops trading within <100ms
GRACEFUL SHUTDOWN:
┌─────────────────────────────────────────────────────────┐
│ User triggers shutdown (SIGTERM / manual)               │
│   ↓ Publishes: system.lifecycle.shutdown.manual        │
├─────────────────────────────────────────────────────────┤
│ Scanner receives SystemShutdownEvent                    │
│   → Finish current detection cycle                      │
│   → Stop scanning gracefully                            │
│   → Publish final metrics                               │
├─────────────────────────────────────────────────────────┤
│ Planner receives SystemShutdownEvent                    │
│   → Finish validating current opportunity               │
│   → Stop planning gracefully                            │
│   → Publish final execution plan or rejection           │
├─────────────────────────────────────────────────────────┤
│ Executor receives SystemShutdownEvent                   │
│   → Wait for in-flight transactions (timeout: 30s)      │
│   → Stop execution gracefully                           │
│   → Publish final execution results                     │
└─────────────────────────────────────────────────────────┘
Result: System stops trading cleanly within 30-60s

Migration Checklist

Phase 1: FlatBuffers Setup (Week 1, Day 1-2)

  • Install flatc compiler on all dev machines
  • Run flatbuf/scripts/generate-all.ps1 to generate code
  • Verify generated code in flatbuf/generated/{go,typescript,rust}/
  • Add FlatBuffers dependencies to Go/TypeScript/Rust projects
  • Test round-trip serialization/deserialization

Phase 2: NATS Infrastructure (Week 1, Day 2-3)

  • Update docker-compose.yml with 6-stream NATS config (METRICS + SYSTEM)
  • Create nats-init.sh script with all 6 streams
  • Test NATS setup: docker-compose up -d nats
  • Verify all 6 streams created: nats stream list
  • Test publishing/consuming FlatBuffers events from each stream

Phase 3: Framework SYSTEM Integration (Week 1, Day 3-4)

  • Scanner: Add SYSTEM stream subscription (killswitch, shutdown)
  • Scanner: Implement handleKillSwitch() and handleShutdown()
  • Planner: Add SYSTEM stream subscription
  • Planner: Implement handleKillSwitch() with rejectPendingOpportunities()
  • Executor: Add SYSTEM stream subscription
  • Executor: Implement handleKillSwitch() with cancelPendingTransactions()
  • Test kill switch propagation (all frameworks stop within 100ms)
  • Test graceful shutdown (all frameworks cleanup within 30-60s)

Phase 4: Scanner Migration to FlatBuffers (Week 1, Day 4-5)

  • Update ArbitrageQuoteScanner to publish FlatBuffers TwoHopArbitrageEvent
  • Implement FlatBuffers serialization in detectArbitrage()
  • Add OpenTelemetry tracing to scanner
  • Add scanner metadata to events
  • Update NATS publishing to OPPORTUNITIES stream (FlatBuffers)
  • Publish LatencyMetricEvent to METRICS stream
  • Add unit tests for FlatBuffers serialization

Phase 5: Planner Migration to FlatBuffers (Week 2, Day 1-3)

  • Update TradingStrategy to consume FlatBuffers TwoHopArbitrageEvent
  • Subscribe to MARKET_DATA stream for fresh quotes (FlatBuffers)
  • Implement FlatBuffers deserialization in strategy validation
  • Publish FlatBuffers ExecutionPlanEvent to PLANNED stream
  • Publish FlatBuffers ExecutionRejectedEvent for rejections
  • Publish LatencyMetricEvent to METRICS stream
  • Test with FlatBuffers OPPORTUNITIES events

Phase 6: Executor Migration to FlatBuffers (Week 2, Day 4-5)

  • Update Executor to consume FlatBuffers ExecutionPlanEvent
  • Implement FlatBuffers deserialization in transaction building
  • Publish FlatBuffers ExecutionResultEvent to EXECUTED stream
  • Publish FlatBuffers ExecutionFailedEvent for failures
  • Publish PnLMetricEvent to METRICS stream
  • Test with FlatBuffers PLANNED events

Phase 7: End-to-End Testing (Week 3, Day 1-2)

  • Integration test: Quote → FlatBuffers events → Execution
  • Load test: 1000 events/sec throughput (all FlatBuffers)
  • Verify trace propagation end-to-end (FlatBuffers preserves traceId)
  • Measure latency reduction (JSON vs FlatBuffers)
  • Test kill switch under load (system stops < 100ms)
  • Test graceful shutdown under load
  • Validate FlatBuffers event data completeness

Phase 8: Production Deployment (Week 3, Day 3-5)

  • Deploy Scanner with FlatBuffers + SYSTEM integration
  • Monitor OPPORTUNITIES stream volume (FlatBuffers)
  • Deploy Planner with FlatBuffers + MARKET_DATA + SYSTEM
  • Monitor PLANNED stream volume (FlatBuffers)
  • Deploy Executor with FlatBuffers + SYSTEM
  • Monitor EXECUTED + METRICS streams
  • Measure end-to-end latency (<200ms target)
  • Verify FlatBuffers performance gains (10-20ms reduction)
  • Tune thresholds based on real data

Conclusion

Key Architectural Decisions

6 Separate JetStreams - Clear separation of concerns, different lifecycles

  • MARKET_DATA: High-throughput market data (10k events/sec)
  • OPPORTUNITIES: Pre-filtered opportunities (500 events/sec)
  • PLANNED: Validated execution plans (50 events/sec)
  • EXECUTED: Execution results (50 events/sec, 7-day retention)
  • METRICS: Performance metrics (1-5k events/sec, 1-hour retention)
  • SYSTEM: Lifecycle & health events (1-10 events/sec, 30-day retention)

FlatBuffers for All Events - Zero-copy serialization

  • 20-150x faster deserialization (0.1-0.5μs vs 8-15μs)
  • 30% smaller message size (300-400 bytes vs 450-600 bytes)
  • Expected latency reduction: 10-20ms per event (5-10% of 200ms budget)
  • Minimal memory allocations (reduces GC pressure)

TwoHopArbitrageEvent with Path - Complete execution data, no re-quoting needed

Scanner = Fast Pre-Filter - <10ms, high false positive rate OK

Planner = Deep Validation - 50-100ms, RPC simulation for accuracy

  • Subscribes to MARKET_DATA for fresh quotes during validation
  • Subscribes to SYSTEM for kill switch and graceful shutdown

Executor = Dumb Submission - <20ms, trusts Planner’s validation

  • Subscribes to SYSTEM for kill switch and transaction cancellation

Distributed Tracing - TraceId propagation in FlatBuffers for end-to-end visibility

Kill Switch Safety - All frameworks subscribe to SYSTEM stream

  • System stops trading within <100ms on kill switch
  • Graceful shutdown within 30-60s with proper cleanup

Performance Expectations

Scanner:  8ms   (quote → TwoHopArbitrageEvent FlatBuffers)
NATS:     2ms   (OPPORTUNITIES stream latency)
Planner:  85ms  (TwoHopArbitrageEvent → ExecutionPlan FlatBuffers)
NATS:     2ms   (PLANNED stream latency)
Executor: 18ms  (ExecutionPlan → Transaction)
────────────────
TOTAL:    115ms (quote → profit in wallet)

With FlatBuffers optimization: -20ms (10ms serialization + 10ms deserialization) Final target: 95ms end-to-end (well under 200ms target) ✅

FlatBuffers Performance Impact

OperationJSON (baseline)FlatBuffersImprovement
Scanner publish10ms2ms-8ms
Planner consume8ms0.5ms-7.5ms
Planner publish10ms2ms-8ms
Executor consume8ms0.5ms-7.5ms
Total saved36ms5ms-31ms

SYSTEM Stream Safety Benefits

ScenarioWithout SYSTEMWith SYSTEMImprovement
Excessive lossManual intervention (minutes)Auto kill switch (<100ms)100x faster
Connection failureTrades continue blindlyAuto pause + alertPrevents losses
Slot driftUnknown until too lateReal-time detectionEarly warning
Graceful shutdownForced kill (lost state)Clean exit (preserved state)Data integrity

Next Steps

  1. Week 1: FlatBuffers setup + NATS 6-stream deployment + SYSTEM integration
  2. Week 2: Migrate Scanner/Planner/Executor to FlatBuffers
  3. Week 3: End-to-end testing + production deployment
  4. Result: <100ms end-to-end execution with full safety controls

Your HFT system is architecturally sound, performance-optimized (FlatBuffers), and production-safe (SYSTEM stream) - ready for implementation! 🚀