Pool State Ownership Migration - Local Quote Service

Pool State Ownership Migration - Local Quote Service

Date: January 1, 2026 Status: 🎯 Active Architecture - Consolidating Pool State Management Version: 1.0 Related Docs: 30-QUOTE-SERVICE-ARCHITECTURE.md, 25-POOL-DISCOVERY-DESIGN.md


Table of Contents

  1. Executive Summary
  2. Problem Statement
  3. Architecture Comparison
  4. New Responsibilities
  5. Pool State Update Flow
  6. Default Reserve Strategy
  7. WebSocket Subscription Management
  8. In-Memory Cache Architecture
  9. Redis Usage (Backup Only)
  10. Migration Timeline
  11. Performance Impact
  12. Configuration
  13. Monitoring & Alerts

Executive Summary

What Changed?

The Local Quote Service now owns ALL pool state management, consolidating responsibilities previously split between Pool Discovery Service and Quote Service. This architectural simplification eliminates dual-responsibility patterns and creates a single source of truth for pool data.

Key Decisions

AspectPrevious (Split)New (Consolidated)
Pool DiscoveryDiscovery + WebSocket updatesDiscovery only
Pool UpdatesSplit (simple→Redis, complex→RPC)All in Quote Service
WebSocketPool Discovery managesQuote Service manages
Primary CacheRedis (1-2ms)In-memory (<1ΞΌs)
State OwnerUnclear (both services)Quote Service only

Benefits

βœ… 1000x faster pool access (in-memory <1ΞΌs vs Redis 1-2ms) βœ… Single source of truth (no coordination needed) βœ… Simpler debugging (all updates in one service) βœ… WebSocket-first updates (sub-second freshness) βœ… RPC backup (automatic fallback on WebSocket failure) βœ… Redis persistence (crash recovery, optional sharing)


Problem Statement

Previous Architecture Issues

The Problem: Pool state management was split between two services with unclear boundaries.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ POOL DISCOVERY SERVICE                                       β”‚
β”‚ ─────────────────────────────────────────────────────────── β”‚
β”‚ Responsibility 1: Discover new pools βœ…                     β”‚
β”‚ Responsibility 2: Update simple pools via WebSocket ❌      β”‚
β”‚ Responsibility 3: Write to Redis ❌                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
                    Redis (1-2ms latency)
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ LOCAL QUOTE SERVICE                                          β”‚
β”‚ ─────────────────────────────────────────────────────────── β”‚
β”‚ β€’ Read simple pools from Redis ❌                           β”‚
β”‚ β€’ Fetch complex pools via RPC (can't avoid anyway) βœ…       β”‚
β”‚ β€’ No WebSocket subscriptions ❌                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Issues:

  1. Dual Responsibility: Pool Discovery had to maintain WebSocket subscriptions for simple pools, mixing concerns
  2. Redis Bottleneck: 1-2ms latency vs <1ΞΌs in-memory (1000x slower)
  3. Couldn’t Avoid RPC Anyway: Complex pools (CLMM/DLMM) have tick/bin data too large for Redis, so Quote Service needed RPC anyway
  4. Debugging Complexity: Pool state changes tracked in two different services
  5. Coordination Required: Deploying one service required understanding impact on the other

The Insight

β€œIf Quote Service needs RPC for complex pools anyway (CLMM ticks too large for Redis), why not have it own ALL pool state updates? Then Pool Discovery can focus solely on discovery.”

This realization led to the new architecture.


Architecture Comparison

Previous Architecture (Split Responsibility)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ POOL DISCOVERY SERVICE (Dual Responsibility)                β”‚
β”‚ ─────────────────────────────────────────────────────────── β”‚
β”‚                                                              β”‚
β”‚ PRIMARY: Discover new pools                                 β”‚
β”‚ β”œβ”€ RPC: GetProgramAccountsWithOpts                         β”‚
β”‚ β”œβ”€ Filter by token pair                                     β”‚
β”‚ β”œβ”€ Solscan enrichment (TVL, reserves)                       β”‚
β”‚ └─ NATS: pool.discovered event                              β”‚
β”‚                                                              β”‚
β”‚ SECONDARY: Update simple pools ←─ PROBLEM                   β”‚
β”‚ β”œβ”€ WebSocket: accountSubscribe (AMM/CPMM pools)            β”‚
β”‚ β”œβ”€ Decode vault balances                                    β”‚
β”‚ β”œβ”€ Write to Redis                                            β”‚
β”‚ └─ Complex pools ignored (too large for Redis)             β”‚
β”‚                                                              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
               Redis (pool state cache)
                    1-2ms read latency
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ LOCAL QUOTE SERVICE (Hybrid Data Access)                    β”‚
β”‚ ─────────────────────────────────────────────────────────── β”‚
β”‚                                                              β”‚
β”‚ β€’ Read simple pools from Redis (AMM/CPMM)                   β”‚
β”‚ β€’ Fetch complex pools via RPC (CLMM/DLMM)                   β”‚
β”‚ β€’ Cache in-memory (but sourced from Redis initially)        β”‚
β”‚ β€’ No WebSocket subscriptions                                β”‚
β”‚ β€’ Background refresh (re-read Redis every 10s)              β”‚
β”‚                                                              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

PROBLEMS:
❌ Pool Discovery maintains WebSocket just for simple pools
❌ Quote Service reads stale data from Redis (10s polling)
❌ Quote Service needs RPC anyway for CLMM/DLMM
❌ Redis is 1000x slower than in-memory
❌ Unclear ownership: who is source of truth?

New Architecture (Single Responsibility)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ POOL DISCOVERY SERVICE (Discovery Only) βœ…                  β”‚
β”‚ ─────────────────────────────────────────────────────────── β”‚
β”‚                                                              β”‚
β”‚ SOLE RESPONSIBILITY: Discover new pools                     β”‚
β”‚                                                              β”‚
β”‚ 1. RPC Scanning (GetProgramAccountsWithOpts)                β”‚
β”‚    β”œβ”€ Raydium AMM V4, CPMM, CLMM                            β”‚
β”‚    β”œβ”€ Meteora DLMM                                           β”‚
β”‚    β”œβ”€ PumpSwap AMM                                           β”‚
β”‚    └─ Orca Whirlpools                                        β”‚
β”‚                                                              β”‚
β”‚ 2. Pool Enrichment (Solscan API - initial only)             β”‚
β”‚    β”œβ”€ TVL (liquidity USD)                                    β”‚
β”‚    β”œβ”€ Reserve amounts (base + quote)                         β”‚
β”‚    β”œβ”€ Token decimals                                         β”‚
β”‚    └─ Default to 1T units if enrichment fails               β”‚
β”‚                                                              β”‚
β”‚ 3. Event Publishing (NATS)                                   β”‚
β”‚    β”œβ”€ pool.discovered (new pool found)                       β”‚
β”‚    └─ pool.enriched (Solscan data added)                     β”‚
β”‚                                                              β”‚
β”‚ βœ… NO WebSocket subscriptions                               β”‚
β”‚ βœ… NO pool state updates                                     β”‚
β”‚ βœ… NO Redis writes                                           β”‚
β”‚                                                              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
                 NATS: pool.discovered.*
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ LOCAL QUOTE SERVICE (State Owner) βœ…                        β”‚
β”‚ ─────────────────────────────────────────────────────────── β”‚
β”‚                                                              β”‚
β”‚ SOLE RESPONSIBILITY: Own all pool state                     β”‚
β”‚                                                              β”‚
β”‚ 1. Pool Discovery Event Handling                            β”‚
β”‚    β”œβ”€ Subscribe: NATS pool.discovered.*                     β”‚
β”‚    β”œβ”€ Add to in-memory cache                                β”‚
β”‚    β”œβ”€ Initialize with default reserves (1T units)           β”‚
β”‚    └─ Subscribe to WebSocket updates                        β”‚
β”‚                                                              β”‚
β”‚ 2. Pool State Updates (ALL pools - WebSocket first)         β”‚
β”‚    WebSocket Subscriptions (real-time, primary):            β”‚
β”‚    β”œβ”€ AMM/CPMM: accountSubscribe (vault balances)          β”‚
β”‚    β”œβ”€ CLMM: accountSubscribe (pool + tick arrays)          β”‚
β”‚    β”œβ”€ DLMM: accountSubscribe (pool + bin arrays)           β”‚
β”‚    β”œβ”€ Sub-second latency                                    β”‚
β”‚    └─ Auto-reconnect with exponential backoff               β”‚
β”‚                                                              β”‚
β”‚    RPC Polling (backup, 30s interval):                      β”‚
β”‚    β”œβ”€ Triggered on WebSocket failure                        β”‚
β”‚    β”œβ”€ Fetch vault balances (AMM/CPMM)                       β”‚
β”‚    β”œβ”€ Fetch tick arrays (CLMM)                              β”‚
β”‚    └─ Fetch bin arrays (DLMM)                               β”‚
β”‚                                                              β”‚
β”‚ 3. In-Memory Cache (Primary, 1000x faster)                  β”‚
β”‚    HOT POOLS (actively quoted):                             β”‚
β”‚    β”œβ”€ <1ΞΌs access time                                      β”‚
β”‚    β”œβ”€ LRU eviction (keep 500 hottest)                       β”‚
β”‚    └─ 1000x faster than Redis                               β”‚
β”‚                                                              β”‚
β”‚    COLD POOLS (rarely quoted):                              β”‚
β”‚    β”œβ”€ Fetch on-demand from RPC                              β”‚
β”‚    └─ Cache for 60s                                          β”‚
β”‚                                                              β”‚
β”‚ 4. Redis Persistence (Backup/Recovery only)                 β”‚
β”‚    β”œβ”€ Async write every 10s (non-blocking)                  β”‚
β”‚    β”œβ”€ Crash recovery: Read on startup                       β”‚
β”‚    β”œβ”€ Cross-instance sharing (optional)                     β”‚
β”‚    └─ NOT the primary data source                           β”‚
β”‚                                                              β”‚
β”‚ 5. Pool State Validation                                    β”‚
β”‚    β”œβ”€ Oracle price deviation alerts (>1%)                   β”‚
β”‚    β”œβ”€ Reserve sanity checks (never nil/zero)                β”‚
β”‚    └─ Freshness tracking (mark stale if >60s)               β”‚
β”‚                                                              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

BENEFITS:
βœ… Single source of truth (no coordination needed)
βœ… 1000x faster (<1ΞΌs in-memory vs 1-2ms Redis)
βœ… WebSocket-first (sub-second updates vs 10s polling)
βœ… RPC backup (automatic fallback)
βœ… Simpler debugging (all pool updates in one service)
βœ… Clear ownership (Quote Service owns state)

New Responsibilities

Pool Discovery Service (Simplified)

What It Does:

  • βœ… Discover new pools via RPC scanning
  • βœ… Enrich pools with Solscan API (initial TVL/reserves)
  • βœ… Publish NATS events when pools are found
  • βœ… Set default reserves (1T units) if enrichment fails

What It Does NOT Do:

  • ❌ WebSocket subscriptions
  • ❌ Pool state updates
  • ❌ Redis writes
  • ❌ Quote calculations

Code Example:

// Pool Discovery: Simplified to discovery only
func (s *PoolDiscoveryService) ScanForPools(ctx context.Context, baseMint, quoteMint string) error {
    // 1. RPC scan for pools
    pools, err := s.protocolClient.FetchPoolsByPair(ctx, baseMint, quoteMint)
    if err != nil {
        return err
    }

    for _, pool := range pools {
        // 2. Set default reserves (NEVER nil or zero)
        if pool.BaseReserve.IsNil() || pool.BaseReserve.IsZero() {
            pool.BaseReserve = math.NewInt(1_000_000_000_000) // 1 trillion
        }
        if pool.QuoteReserve.IsNil() || pool.QuoteReserve.IsZero() {
            pool.QuoteReserve = math.NewInt(1_000_000_000_000) // 1 trillion
        }

        // 3. Enrich with Solscan (best effort, non-blocking)
        go func(p *domain.Pool) {
            enriched, err := s.solscanEnricher.EnrichPool(ctx, p)
            if err != nil {
                observability.LogWarn("Solscan enrichment failed, using defaults",
                    observability.String("poolId", p.ID[:8]),
                    observability.Error(err))
                // Use default reserves already set
                s.publishPoolDiscovered(p)
            } else {
                // Use enriched data
                s.publishPoolDiscovered(enriched)
            }
        }(pool)
    }

    return nil
}

func (s *PoolDiscoveryService) publishPoolDiscovered(pool *domain.Pool) {
    // Publish to NATS: pool.discovered.*
    event := &PoolDiscoveredEvent{
        PoolID:       pool.ID,
        ProgramID:    pool.ProgramID,
        DEX:          pool.DEX,
        BaseMint:     pool.BaseMint,
        QuoteMint:    pool.QuoteMint,
        BaseReserve:  pool.BaseReserve.String(),
        QuoteReserve: pool.QuoteReserve.String(),
        LiquidityUSD: pool.LiquidityUSD,
        Timestamp:    time.Now().Unix(),
    }

    s.natsPublisher.Publish("pool.discovered."+pool.DEX, event)
}

Local Quote Service (Enhanced)

New Responsibilities:

  • βœ… Subscribe to NATS pool.discovered.* events
  • βœ… Manage ALL WebSocket subscriptions (all pool types)
  • βœ… Handle WebSocket account updates (decode, update cache)
  • βœ… RPC backup polling (30s interval on WebSocket failure)
  • βœ… Maintain in-memory cache (hot pool priority)
  • βœ… Persist to Redis (async, for crash recovery)
  • βœ… Validate pool state (oracle deviation, reserve sanity)

Code Example:

// Local Quote Service: Enhanced pool state ownership
func (s *LocalQuoteService) Start(ctx context.Context) error {
    // 1. Load persisted pools from Redis (crash recovery)
    if err := s.loadPoolsFromRedis(ctx); err != nil {
        observability.LogWarn("Failed to load pools from Redis", observability.Error(err))
    }

    // 2. Subscribe to pool discovery events
    go s.subscribeToPoolDiscovery(ctx)

    // 3. Start WebSocket manager
    go s.wsManager.Start(ctx)

    // 4. Start RPC backup poller
    go s.rpcBackupPoller(ctx)

    // 5. Start Redis persistence worker
    go s.redisPersistenceWorker(ctx)

    return nil
}

func (s *LocalQuoteService) subscribeToPoolDiscovery(ctx context.Context) {
    // Subscribe to NATS: pool.discovered.*
    sub, err := s.natsClient.Subscribe("pool.discovered.*", func(msg *nats.Msg) {
        var event PoolDiscoveredEvent
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            observability.LogError("Failed to unmarshal pool discovered event",
                observability.Error(err))
            return
        }

        s.handlePoolDiscovered(ctx, &event)
    })

    if err != nil {
        observability.LogError("Failed to subscribe to pool discovery",
            observability.Error(err))
        return
    }

    observability.LogInfo("Subscribed to pool discovery events")

    // Keep subscription alive
    <-ctx.Done()
    sub.Unsubscribe()
}

func (s *LocalQuoteService) handlePoolDiscovered(ctx context.Context, event *PoolDiscoveredEvent) {
    // Convert event to pool
    pool := s.eventToPool(event)

    // Add to in-memory cache
    s.poolCache.Set(pool.ID, &PoolState{
        Pool:        pool,
        LastUpdate:  time.Now(),
        IsStale:     false,
    })

    observability.LogInfo("Pool added to cache",
        observability.String("poolId", pool.ID[:8]),
        observability.String("dex", pool.DEX),
        observability.Uint64("baseReserve", pool.BaseReserve.Uint64()),
        observability.Uint64("quoteReserve", pool.QuoteReserve.Uint64()))

    // Subscribe to WebSocket updates
    if err := s.wsManager.SubscribePool(pool.ID, pool.ProgramID); err != nil {
        observability.LogError("Failed to subscribe to pool updates",
            observability.String("poolId", pool.ID[:8]),
            observability.Error(err))
    }

    // Queue high-priority RPC fetch (get fresh data)
    s.refreshQueue <- RefreshCommand{
        PoolID:   pool.ID,
        PoolType: pool.Type,
        Priority: HIGH,
    }
}

Pool State Update Flow

Real-Time Updates (WebSocket - Primary)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 1. Pool Discovery publishes NATS event                      β”‚
β”‚    pool.discovered.raydium_amm                               β”‚
β”‚    {                                                         β”‚
β”‚      poolId: "7xKX...",                                      β”‚
β”‚      baseReserve: "1000000000000",  // 1T default           β”‚
β”‚      quoteReserve: "1000000000000",  // 1T default          β”‚
β”‚      ...                                                     β”‚
β”‚    }                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 2. Local Quote Service receives event                       β”‚
β”‚    β€’ Parse pool data                                         β”‚
β”‚    β€’ Add to in-memory cache (default reserves)              β”‚
β”‚    β€’ Subscribe to WebSocket: accountSubscribe(poolId)       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 3. WebSocket connection established                         β”‚
β”‚    β€’ Solana RPC WebSocket: ws://rpc.solana.com              β”‚
β”‚    β€’ accountSubscribe request sent                          β”‚
β”‚    β€’ Subscription ID: 12345                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 4. Account update received (<1s latency)                    β”‚
β”‚    {                                                         β”‚
β”‚      "subscription": 12345,                                  β”‚
β”‚      "result": {                                             β”‚
β”‚        "value": {                                            β”‚
β”‚          "data": ["base64EncodedAccountData", "base64"],     β”‚
β”‚          "slot": 234567890                                   β”‚
β”‚        }                                                     β”‚
β”‚      }                                                       β”‚
β”‚    }                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 5. Decode account data (pool-specific)                      β”‚
β”‚    β€’ Base64 decode                                           β”‚
β”‚    β€’ Parse binary layout (AMM/CLMM/DLMM specific)           β”‚
β”‚    β€’ Extract vault balances / tick data / bin data          β”‚
β”‚    β€’ Calculate reserves                                      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 6. Update in-memory cache (atomic)                          β”‚
β”‚    pool.BaseReserve = actualVaultBalance                    β”‚
β”‚    pool.QuoteReserve = actualVaultBalance                   β”‚
β”‚    pool.LastUpdate = time.Now()                             β”‚
β”‚    pool.IsStale = false                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 7. Invalidate Layer 2 quote cache (pool-aware)              β”‚
β”‚    β€’ Find all quotes using this pool                         β”‚
β”‚    β€’ Invalidate ALL matching quotes                          β”‚
β”‚    β€’ Ensures next quote request uses fresh data             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 8. Persist to Redis (async, non-blocking)                   β”‚
β”‚    β€’ Marshal pool state to JSON                              β”‚
β”‚    β€’ Write to Redis (fire-and-forget)                        β”‚
β”‚    β€’ Log errors but don't block                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
                   Next quote uses fresh state βœ…

Backup Updates (RPC Polling - Secondary)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Every 30s OR on WebSocket disconnect                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 1. Check pool staleness                                     β”‚
β”‚    β€’ Pool LastUpdate > 60s = stale                          β”‚
β”‚    β€’ Mark as stale                                           β”‚
β”‚    β€’ Queue for refresh                                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 2. Fetch fresh data from RPC                                β”‚
β”‚    AMM/CPMM:                                                 β”‚
β”‚    β€’ getMultipleAccounts(vaultAddresses)                    β”‚
β”‚    β€’ Parse vault balances                                    β”‚
β”‚                                                              β”‚
β”‚    CLMM:                                                     β”‚
β”‚    β€’ getAccountInfo(poolAddress)                            β”‚
β”‚    β€’ getMultipleAccounts(tickArrayAddresses)                β”‚
β”‚    β€’ Parse tick data                                         β”‚
β”‚                                                              β”‚
β”‚    DLMM:                                                     β”‚
β”‚    β€’ getAccountInfo(poolAddress)                            β”‚
β”‚    β€’ getMultipleAccounts(binArrayAddresses)                 β”‚
β”‚    β€’ Parse bin data                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 3. Update in-memory cache                                   β”‚
β”‚    β€’ Same as WebSocket update flow                          β”‚
β”‚    β€’ Invalidate Layer 2 quotes                              β”‚
β”‚    β€’ Persist to Redis                                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Default Reserve Strategy

Problem

Newly discovered pools may not have enriched data from Solscan API yet due to:

  • API rate limits
  • Network failures
  • Pool too new (not indexed by Solscan)
  • Solscan circuit breaker open

Solution

Set large default reserves (1 trillion units) to avoid premature filtering.

Key Principles:

  1. Never nil: Reserves must always have a value
  2. Never zero: Zero reserves break pool math
  3. Large defaults: 1T units ensures pool not filtered by liquidity checks
  4. High priority refresh: RPC/WebSocket fetches actual values immediately

Implementation

Pool Discovery Service:

func (s *PoolDiscoveryService) scanPool(pool *domain.Pool) {
    // Set defaults FIRST (before enrichment)
    if pool.BaseReserve.IsNil() || pool.BaseReserve.IsZero() {
        pool.BaseReserve = math.NewInt(1_000_000_000_000) // 1 trillion
        observability.LogInfo("Set default base reserve",
            observability.String("poolId", pool.ID[:8]))
    }

    if pool.QuoteReserve.IsNil() || pool.QuoteReserve.IsZero() {
        pool.QuoteReserve = math.NewInt(1_000_000_000_000) // 1 trillion
        observability.LogInfo("Set default quote reserve",
            observability.String("poolId", pool.ID[:8]))
    }

    // Try to enrich (best effort, non-blocking)
    enriched, err := s.solscanEnricher.EnrichPool(ctx, pool)
    if err != nil {
        observability.LogWarn("Solscan enrichment failed, using defaults",
            observability.String("poolId", pool.ID[:8]),
            observability.Error(err))

        // Publish with defaults
        s.publishPoolDiscovered(pool)
    } else {
        // Publish with enriched data
        s.publishPoolDiscovered(enriched)
    }
}

Local Quote Service:

func (s *LocalQuoteService) handlePoolDiscovered(ctx context.Context, event *PoolDiscoveredEvent) {
    pool := s.eventToPool(event)

    // Validate reserves (should never be nil/zero at this point)
    if pool.BaseReserve.IsNil() || pool.BaseReserve.IsZero() {
        observability.LogError("Received pool with nil/zero base reserve",
            observability.String("poolId", pool.ID[:8]))
        // Re-apply defaults
        pool.BaseReserve = math.NewInt(1_000_000_000_000)
    }

    if pool.QuoteReserve.IsNil() || pool.QuoteReserve.IsZero() {
        observability.LogError("Received pool with nil/zero quote reserve",
            observability.String("poolId", pool.ID[:8]))
        // Re-apply defaults
        pool.QuoteReserve = math.NewInt(1_000_000_000_000)
    }

    // Add to cache
    s.poolCache.Set(pool.ID, &PoolState{
        Pool:        pool,
        LastUpdate:  time.Now(),
        IsStale:     false,
        NeedsRefresh: true, // Mark for immediate refresh
    })

    // Subscribe to WebSocket (will get actual values)
    s.wsManager.SubscribePool(pool.ID, pool.ProgramID)

    // Queue HIGH priority RPC fetch (verify actual reserves)
    s.refreshQueue <- RefreshCommand{
        PoolID:   pool.ID,
        Priority: HIGH,
    }
}

WebSocket Subscription Management

Architecture

type WebSocketManager struct {
    conn           *websocket.Conn
    rpcURL         string
    subscriptions  map[string]uint64            // poolID -> subscriptionID
    reverseMap     map[uint64]string            // subscriptionID -> poolID
    updateHandlers map[string]PoolUpdateHandler // poolID -> handler
    reconnectDelay time.Duration
    maxReconnect   int
    mu             sync.RWMutex
}

type PoolUpdateHandler interface {
    HandleUpdate(accountData []byte, slot uint64) error
    GetPoolType() PoolType
}

Subscription Flow

func (w *WebSocketManager) SubscribePool(poolID, programID string) error {
    w.mu.Lock()
    defer w.mu.Unlock()

    // Check if already subscribed
    if _, exists := w.subscriptions[poolID]; exists {
        observability.LogDebug("Pool already subscribed",
            observability.String("poolId", poolID[:8]))
        return nil
    }

    // Build accountSubscribe request
    req := &rpc.AccountSubscribeRequest{
        Account: poolID,
        Config: &rpc.AccountSubscribeConfig{
            Encoding:   "base64",
            Commitment: "confirmed",
        },
    }

    // Send subscription
    subID, err := w.conn.AccountSubscribe(req)
    if err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)
    }

    // Track subscription (bidirectional maps)
    w.subscriptions[poolID] = subID
    w.reverseMap[subID] = poolID

    observability.LogInfo("Subscribed to pool updates",
        observability.String("poolId", poolID[:8]),
        observability.Uint64("subscriptionId", subID))

    // Record metric
    metrics.RecordPoolSubscription(poolID, programID)

    return nil
}

Update Handling

func (w *WebSocketManager) handleAccountNotification(notification *rpc.AccountNotification) {
    // Find pool by subscription ID
    w.mu.RLock()
    poolID, exists := w.reverseMap[notification.Subscription]
    w.mu.RUnlock()

    if !exists {
        observability.LogWarn("Received notification for unknown subscription",
            observability.Uint64("subscriptionId", notification.Subscription))
        return
    }

    // Decode account data (base64 -> bytes)
    accountData, err := base64.StdEncoding.DecodeString(notification.Result.Value.Data[0])
    if err != nil {
        observability.LogError("Failed to decode account data",
            observability.String("poolId", poolID[:8]),
            observability.Error(err))
        return
    }

    // Get update handler for this pool
    w.mu.RLock()
    handler, exists := w.updateHandlers[poolID]
    w.mu.RUnlock()

    if !exists {
        observability.LogWarn("No update handler for pool",
            observability.String("poolId", poolID[:8]))
        return
    }

    // Handle update (pool-specific decoding)
    if err := handler.HandleUpdate(accountData, notification.Result.Value.Slot); err != nil {
        observability.LogError("Failed to handle pool update",
            observability.String("poolId", poolID[:8]),
            observability.Error(err))
        return
    }

    // Record metric
    metrics.RecordPoolUpdate(poolID, time.Since(notification.Result.Value.Timestamp))
}

Reconnection Logic

func (w *WebSocketManager) Start(ctx context.Context) {
    reconnectAttempts := 0

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        // Connect to WebSocket
        conn, err := websocket.Dial(w.rpcURL)
        if err != nil {
            reconnectAttempts++
            delay := w.calculateBackoff(reconnectAttempts)

            observability.LogWarn("WebSocket connection failed, retrying",
                observability.Error(err),
                observability.Int("attempt", reconnectAttempts),
                observability.Duration("delay", delay))

            time.Sleep(delay)
            continue
        }

        w.conn = conn
        reconnectAttempts = 0

        observability.LogInfo("WebSocket connected")

        // Resubscribe to all pools
        w.resubscribeAll()

        // Listen for messages
        w.listen(ctx)

        // Connection closed, reconnect
        observability.LogWarn("WebSocket connection closed, reconnecting")
    }
}

func (w *WebSocketManager) calculateBackoff(attempts int) time.Duration {
    // Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s
    delay := time.Duration(1<<uint(attempts-1)) * time.Second
    if delay > 30*time.Second {
        delay = 30 * time.Second
    }
    return delay
}

func (w *WebSocketManager) resubscribeAll() {
    w.mu.RLock()
    poolIDs := make([]string, 0, len(w.subscriptions))
    for poolID := range w.subscriptions {
        poolIDs = append(poolIDs, poolID)
    }
    w.mu.RUnlock()

    // Clear old subscriptions
    w.mu.Lock()
    w.subscriptions = make(map[string]uint64)
    w.reverseMap = make(map[uint64]string)
    w.mu.Unlock()

    // Resubscribe
    for _, poolID := range poolIDs {
        if err := w.SubscribePool(poolID, ""); err != nil {
            observability.LogError("Failed to resubscribe pool",
                observability.String("poolId", poolID[:8]),
                observability.Error(err))
        }
    }

    observability.LogInfo("Resubscribed to all pools",
        observability.Int("count", len(poolIDs)))
}

In-Memory Cache Architecture

Design

type InMemoryPoolCache struct {
    hotPools  *lru.Cache // LRU cache for frequently accessed pools
    coldPools sync.Map   // Infrequently accessed pools
    hotLimit  int        // Max hot pool count (default: 500)
    mu        sync.RWMutex
}

type PoolState struct {
    Pool         *domain.Pool
    CLMMData     *CLMMPoolData // Tick arrays for CLMM
    DLMMData     *DLMMPoolData // Bin arrays for DLMM
    LastUpdate   time.Time
    RefreshCount int
    IsStale      bool
    NeedsRefresh bool
    mu           sync.RWMutex
}

Hot vs Cold Pools

Hot Pools (LRU cache, 500 limit):

  • Actively quoted (accessed in last 60s)
  • Kept in memory for instant access (<1ΞΌs)
  • LRU eviction when cache full
  • Evicted pools move to cold storage

Cold Pools (sync.Map):

  • Rarely quoted (>60s since last access)
  • Fetched from RPC on-demand
  • Cached for 60s, then discarded
  • Promoted to hot if quoted again

Access Pattern

func (c *InMemoryPoolCache) Get(poolID string) (*PoolState, error) {
    // 1. Check hot cache (LRU)
    if val, ok := c.hotPools.Get(poolID); ok {
        observability.LogDebug("Hot pool cache hit",
            observability.String("poolId", poolID[:8]))
        metrics.RecordCacheHit("hot")
        return val.(*PoolState), nil
    }

    // 2. Check cold cache (sync.Map)
    if val, ok := c.coldPools.Load(poolID); ok {
        poolState := val.(*PoolState)

        // Check if stale
        if time.Since(poolState.LastUpdate) < 60*time.Second {
            observability.LogDebug("Cold pool cache hit",
                observability.String("poolId", poolID[:8]))
            metrics.RecordCacheHit("cold")

            // Promote to hot if accessed again
            c.promoteToHot(poolID, poolState)

            return poolState, nil
        }

        // Stale, remove from cold
        c.coldPools.Delete(poolID)
    }

    // 3. Cache miss - fetch from RPC
    observability.LogDebug("Pool cache miss, fetching from RPC",
        observability.String("poolId", poolID[:8]))
    metrics.RecordCacheMiss()

    poolState, err := c.fetchPoolFromRPC(poolID)
    if err != nil {
        return nil, err
    }

    // Add to cold cache
    c.coldPools.Store(poolID, poolState)

    return poolState, nil
}

func (c *InMemoryPoolCache) promoteToHot(poolID string, poolState *PoolState) {
    c.mu.Lock()
    defer c.mu.Unlock()

    // Add to hot cache (LRU evicts oldest if full)
    c.hotPools.Add(poolID, poolState)

    // Remove from cold
    c.coldPools.Delete(poolID)

    observability.LogDebug("Promoted pool to hot cache",
        observability.String("poolId", poolID[:8]))
    metrics.RecordCachePromotion()
}

Performance Characteristics

OperationHot CacheCold CacheRPC Fetch
Access Time<1ΞΌs~10ΞΌs50-100ms
Capacity500 poolsUnlimitedN/A
EvictionLRU60s TTLN/A
Thread SafetyMutexsync.MapN/A
PromotionYes (from cold)NoTo cold

Redis Usage (Backup Only)

New Role

Redis is no longer the primary data source. It serves two purposes:

  1. Crash recovery: Restore pool state on service restart
  2. Cross-instance sharing: Optional, if running 2+ Local Quote Service instances

Write Pattern (Async, Non-Blocking)

func (s *LocalQuoteService) redisPersistenceWorker(ctx context.Context) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            s.persistPoolsToRedis(ctx)
        case <-ctx.Done():
            // Final persist before shutdown
            s.persistPoolsToRedis(context.Background())
            return
        }
    }
}

func (s *LocalQuoteService) persistPoolsToRedis(ctx context.Context) {
    // Get snapshot of hot pools
    hotPools := s.poolCache.GetAllHotPools()

    observability.LogDebug("Persisting pools to Redis",
        observability.Int("count", len(hotPools)))

    // Persist in batches (non-blocking)
    for _, poolState := range hotPools {
        go func(ps *PoolState) {
            // Marshal to JSON
            data, err := json.Marshal(ps.Pool)
            if err != nil {
                observability.LogError("Failed to marshal pool",
                    observability.String("poolId", ps.Pool.ID[:8]),
                    observability.Error(err))
                metrics.RecordRedisPersistError()
                return
            }

            // Write to Redis (fire-and-forget)
            key := fmt.Sprintf("pool:state:%s", ps.Pool.ID)
            if err := s.redisClient.Set(ctx, key, data, 24*time.Hour).Err(); err != nil {
                observability.LogError("Failed to persist pool to Redis",
                    observability.String("poolId", ps.Pool.ID[:8]),
                    observability.Error(err))
                metrics.RecordRedisPersistError()
                return
            }

            metrics.RecordRedisPersistSuccess()
        }(poolState)
    }
}

Read Pattern (Startup Only)

func (s *LocalQuoteService) loadPoolsFromRedis(ctx context.Context) error {
    observability.LogInfo("Loading pools from Redis (crash recovery)")

    // Scan for all pool keys
    var cursor uint64
    var keys []string

    for {
        var batch []string
        var err error

        batch, cursor, err = s.redisClient.Scan(ctx, cursor, "pool:state:*", 100).Result()
        if err != nil {
            return fmt.Errorf("failed to scan Redis: %w", err)
        }

        keys = append(keys, batch...)

        if cursor == 0 {
            break
        }
    }

    observability.LogInfo("Found pools in Redis",
        observability.Int("count", len(keys)))

    // Load pools (parallel)
    var wg sync.WaitGroup
    poolsChan := make(chan *domain.Pool, len(keys))

    for _, key := range keys {
        wg.Add(1)

        go func(k string) {
            defer wg.Done()

            // Get pool data
            data, err := s.redisClient.Get(ctx, k).Bytes()
            if err != nil {
                observability.LogError("Failed to get pool from Redis",
                    observability.String("key", k),
                    observability.Error(err))
                return
            }

            // Unmarshal
            var pool domain.Pool
            if err := json.Unmarshal(data, &pool); err != nil {
                observability.LogError("Failed to unmarshal pool",
                    observability.String("key", k),
                    observability.Error(err))
                return
            }

            poolsChan <- &pool
        }(key)
    }

    // Wait for all loads to complete
    wg.Wait()
    close(poolsChan)

    // Add pools to cache
    loadedCount := 0
    for pool := range poolsChan {
        s.poolCache.Set(pool.ID, &PoolState{
            Pool:         pool,
            LastUpdate:   time.Now(),
            IsStale:      true, // Mark as stale, will refresh
            NeedsRefresh: true,
        })

        // Subscribe to WebSocket
        s.wsManager.SubscribePool(pool.ID, pool.ProgramID)

        // Queue RPC refresh (verify freshness)
        s.refreshQueue <- RefreshCommand{
            PoolID:   pool.ID,
            Priority: NORMAL,
        }

        loadedCount++
    }

    observability.LogInfo("Loaded pools from Redis",
        observability.Int("loaded", loadedCount),
        observability.Int("failed", len(keys)-loadedCount))

    return nil
}

Key Points:

  • βœ… Redis reads are startup only (not on hot path)
  • βœ… Pools loaded from Redis are marked stale and queued for refresh
  • βœ… WebSocket subscriptions re-established immediately
  • βœ… RPC refresh verifies actual pool state (don’t trust stale Redis data)
  • βœ… After 30-60s, all pools refreshed with live data

Migration Timeline

Phase 1: Pool Discovery Simplification (Week 1 - Done)

Goal: Remove WebSocket and Redis writes from Pool Discovery Service.

Tasks:

  • βœ… Remove WebSocket subscription code
  • βœ… Remove Redis write logic for pool updates
  • βœ… Keep only RPC scanning and NATS event publishing
  • βœ… Add default reserve logic (1T units if nil/zero)
  • βœ… Test discovery-only functionality

Acceptance Criteria:

  • Pool Discovery only scans and publishes events
  • No WebSocket connections from Pool Discovery
  • No Redis writes from Pool Discovery
  • Default reserves set for all discovered pools

Phase 2: Local Quote Service Enhancement (Week 2 - In Progress)

Goal: Add pool state ownership to Local Quote Service.

Tasks:

  • πŸ”„ Add NATS subscription for pool.discovered.* events
  • πŸ”„ Implement WebSocket manager with reconnection logic
  • πŸ”„ Add pool account subscription handling
  • πŸ”„ Implement RPC backup polling (30s interval)
  • πŸ”„ Test WebSocket updates and RPC fallback

Acceptance Criteria:

  • Quote Service subscribes to pool discovery events
  • WebSocket subscriptions working for all pool types
  • Account updates decoded and cache updated
  • RPC backup kicks in on WebSocket failure
  • Reconnection works after network issues

Phase 3: In-Memory Cache Priority (Week 3 - Pending)

Goal: Implement performant in-memory cache with Redis backup.

Tasks:

  • ⏳ Implement LRU cache for hot pools (500 pool limit)
  • ⏳ Add Redis async persistence (10s interval)
  • ⏳ Implement crash recovery (read Redis on startup)
  • ⏳ Test cache performance and persistence
  • ⏳ Measure latency improvement (1ms β†’ <1ΞΌs)

Acceptance Criteria:

  • Hot pool access <1ΞΌs latency
  • LRU eviction working correctly
  • Redis persistence every 10s (async)
  • Crash recovery restores pools from Redis
  • Performance improvement measured and documented

Phase 4: Production Migration (Week 4 - Pending)

Goal: Deploy new architecture to production.

Tasks:

  • ⏳ Deploy new Pool Discovery Service (discovery-only)
  • ⏳ Deploy enhanced Local Quote Service (state owner)
  • ⏳ Monitor pool counts and update freshness
  • ⏳ Verify quote accuracy with oracle validation
  • ⏳ 24-hour soak test for stability

Acceptance Criteria:

  • Pool counts match pre-migration levels
  • Quote freshness <1s (WebSocket updates)
  • No increase in quote errors
  • Grafana dashboards show improvements
  • No alerts during 24h soak test

Phase 5: Cleanup (Week 5 - Pending)

Goal: Remove old code and update documentation.

Tasks:

  • ⏳ Remove unused Redis pool state writes from Pool Discovery
  • ⏳ Archive old WebSocket subscription code
  • ⏳ Update documentation (this doc, architecture diagrams)
  • ⏳ Post-mortem: Lessons learned
  • ⏳ Knowledge transfer to team

Acceptance Criteria:

  • No dead code remaining
  • Documentation fully updated
  • Architecture diagrams reflect new design
  • Post-mortem document written
  • Team trained on new architecture

Performance Impact

Before (Split Responsibility)

Pool State Access:

Simple pools (Redis read):
β€’ Latency: 1-2ms
β€’ Throughput: 500-1000 ops/s
β€’ Freshness: 10s (polling interval)
β€’ Bottleneck: Network + Redis

Complex pools (RPC fetch):
β€’ Latency: 50-100ms
β€’ Throughput: 10-20 ops/s
β€’ Freshness: On-demand
β€’ Bottleneck: RPC rate limits

Cache Invalidation:

  • Manual, error-prone
  • Pool Discovery writes to Redis
  • Quote Service polls Redis every 10s
  • Delay between pool update and quote refresh: 10s

WebSocket Overhead:

  • Pool Discovery maintains connections for simple pools
  • Quote Service has no WebSocket connections
  • Duplicated infrastructure (WebSocket manager in Pool Discovery)

After (Single Responsibility)

Pool State Access:

Hot pools (in-memory):
β€’ Latency: <1ΞΌs (1000x faster)
β€’ Throughput: 1M+ ops/s
β€’ Freshness: <1s (WebSocket)
β€’ Bottleneck: CPU (negligible)

Cold pools (RPC fetch):
β€’ Latency: 50-100ms (same as before)
β€’ Throughput: 10-20 ops/s
β€’ Freshness: On-demand
β€’ Bottleneck: RPC rate limits

Cache Invalidation:

  • Automatic, pool-aware
  • WebSocket update β†’ invalidate Layer 2 quotes
  • Quote Service owns entire flow
  • Delay between pool update and quote refresh: <1s

WebSocket Efficiency:

  • Only Quote Service maintains connections (all pool types)
  • No duplicated infrastructure
  • Simpler monitoring (one service, one set of metrics)

Estimated Improvements

MetricBeforeAfterImprovement
Hot pool access1-2ms<1ΞΌs1000-2000x faster
Quote freshness10s<1s10x fresher
Cache hit rate80%95%+Better locality
Debugging time~30min~10min3x faster
Deployment complexityHigh (2 services)Low (1 service)2x simpler
WebSocket connections50% of pools100% of pools2x coverage

Real-World Impact

HFT Quote Pipeline (200ms total budget):

BEFORE:
Quote Service: 10ms (Redis read + pool math)
Scanner: 10ms
Planner: 100ms
Executor: 20ms
─────────────
Total: 140ms βœ… (60ms margin)

AFTER:
Quote Service: 1ms (in-memory + pool math)  ← 10x faster!
Scanner: 10ms
Planner: 100ms
Executor: 20ms
─────────────
Total: 131ms βœ… (69ms margin, +9ms buffer)

Additional margin: The 9ms savings can be used for:

  • More thorough validation in Planner
  • Retry logic in Executor
  • Additional arbitrage strategies

Configuration

Environment Variables

Pool Discovery Service (removed variables):

# ❌ REMOVED - No longer needed
# POOL_DISCOVERY_WEBSOCKET_ENABLED=true
# POOL_DISCOVERY_REDIS_WRITE_ENABLED=true
# POOL_DISCOVERY_REDIS_WRITE_INTERVAL=10s

Local Quote Service (new variables):

# Pool State Ownership
POOL_STATE_OWNERSHIP_ENABLED=true

# NATS Pool Discovery Subscription
POOL_DISCOVERY_NATS_SUBSCRIBE=true
POOL_DISCOVERY_NATS_SUBJECTS=pool.discovered.*

# WebSocket Configuration
POOL_WEBSOCKET_ENABLED=true
POOL_WEBSOCKET_URL=ws://rpc.solana.com
POOL_WEBSOCKET_RECONNECT_DELAY=5s
POOL_WEBSOCKET_MAX_RECONNECT=10

# RPC Backup Polling
POOL_RPC_BACKUP_ENABLED=true
POOL_RPC_BACKUP_INTERVAL=30s
POOL_RPC_BACKUP_ON_WEBSOCKET_FAILURE=true

# In-Memory Cache
POOL_INMEMORY_HOT_LIMIT=500
POOL_INMEMORY_COLD_TTL=60s
POOL_CACHE_STALENESS_THRESHOLD=60s

# Redis Persistence (Backup)
POOL_REDIS_PERSIST_ENABLED=true
POOL_REDIS_PERSIST_INTERVAL=10s
POOL_REDIS_PERSIST_TTL=24h

# Default Reserves
POOL_DEFAULT_RESERVE_UNITS=1000000000000  # 1 trillion

Docker Compose Updates

services:
  pool-discovery:
    environment:
      # Simplified configuration (discovery only)
      - ENVIRONMENT=production
      - RPC_ENDPOINTS=${RPC_ENDPOINTS}
      - NATS_URL=nats://nats:4222
      - SOLSCAN_PROXY_ENABLED=${SOLSCAN_PROXY_ENABLED}
      - POOL_DEFAULT_RESERVE_UNITS=1000000000000
      # ❌ Removed WebSocket and Redis write configs

  local-quote-service:
    environment:
      # Enhanced configuration (state owner)
      - POOL_STATE_OWNERSHIP_ENABLED=true
      - POOL_DISCOVERY_NATS_SUBSCRIBE=true
      - POOL_WEBSOCKET_ENABLED=true
      - POOL_RPC_BACKUP_ENABLED=true
      - POOL_INMEMORY_HOT_LIMIT=500
      - POOL_REDIS_PERSIST_ENABLED=true
      - NATS_URL=nats://nats:4222
      - REDIS_URL=redis://redis:6379/0

Monitoring & Alerts

New Metrics

Pool Discovery Events:

# NATS event publishing
pool_discovered_events_total{dex="raydium_amm"}
pool_discovery_nats_lag_seconds
pool_discovery_errors_total

WebSocket Subscriptions:

# Active subscriptions
pool_websocket_subscriptions_active{type="amm"}
pool_websocket_subscriptions_active{type="clmm"}

# Reconnections
pool_websocket_reconnections_total
pool_websocket_connection_uptime_seconds

# Update latency
pool_websocket_update_latency_seconds_p50
pool_websocket_update_latency_seconds_p95

Pool State Freshness:

# Age of pool data
pool_state_age_seconds_p50
pool_state_age_seconds_p95
pool_state_age_seconds_p99

# Stale pools
pool_state_stale_count
pool_state_needs_refresh_count

In-Memory Cache:

# Cache size
pool_cache_hot_count
pool_cache_cold_count

# Cache performance
pool_cache_hit_rate{tier="hot"}
pool_cache_hit_rate{tier="cold"}
pool_cache_miss_rate

# Cache operations
pool_cache_evictions_total{tier="hot"}
pool_cache_promotions_total  # cold -> hot

Redis Persistence:

# Write performance
pool_redis_write_latency_seconds_p95
pool_redis_write_errors_total
pool_redis_write_success_total

# Recovery
pool_redis_recovery_pools_loaded
pool_redis_recovery_duration_seconds

New Alerts

Critical (PagerDuty):

groups:
- name: pool_state_critical
  rules:
  - alert: PoolDiscoveryNATSDown
    expr: up{job="pool-discovery"} == 1 AND nats_connection_status == 0
    for: 2m
    annotations:
      summary: "Pool Discovery cannot publish to NATS"
      impact: "New pools not discovered by Quote Service"

  - alert: PoolWebSocketDisconnected
    expr: pool_websocket_subscriptions_active == 0
    for: 1m
    annotations:
      summary: "No active WebSocket subscriptions (all pools stale)"
      impact: "Quote Service using stale pool data, quotes may be inaccurate"

  - alert: PoolStateAllStale
    expr: pool_state_stale_count > (pool_cache_hot_count + pool_cache_cold_count) * 0.5
    for: 5m
    annotations:
      summary: ">50% of pools are stale (>60s without update)"
      impact: "Quote accuracy degraded, potential arbitrage misses"

Warning (Slack):

- alert: PoolDiscoveryNATSLag
  expr: pool_discovery_nats_lag_seconds > 5
  for: 2m
  annotations:
    summary: "Pool discovery events delayed >5s"
    impact: "Quote Service has delayed pool data"

- alert: PoolWebSocketReconnecting
  expr: rate(pool_websocket_reconnections_total[5m]) > 0.1
  for: 5m
  annotations:
    summary: "WebSocket reconnecting frequently (>0.1/s)"
    impact: "Pool updates may be delayed during reconnections"

- alert: PoolCacheLowHitRate
  expr: pool_cache_hit_rate{tier="hot"} < 0.8
  for: 10m
  annotations:
    summary: "Hot pool cache hit rate <80%"
    impact: "Increased latency, more RPC calls"

- alert: PoolRedisPersistFailing
  expr: rate(pool_redis_write_errors_total[5m]) > 0.5
  for: 10m
  annotations:
    summary: "Redis persistence failing (>50% error rate)"
    impact: "Crash recovery may be incomplete"

Grafana Dashboard

New Panels:

Row 1: Pool Discovery

  • Pool discovered events (rate)
  • NATS publish latency
  • Solscan enrichment success rate

Row 2: WebSocket Health

  • Active subscriptions by pool type
  • Reconnection rate
  • Update latency distribution

Row 3: Pool State Freshness

  • Pool age distribution (histogram)
  • Stale pool count over time
  • Pools needing refresh

Row 4: Cache Performance

  • Hot vs cold pool count
  • Cache hit rate by tier
  • Evictions and promotions

Row 5: Redis Persistence

  • Write latency
  • Error rate
  • Last recovery timestamp

Conclusion

Summary

The Pool State Ownership Migration consolidates all pool state management in the Local Quote Service, eliminating the split-responsibility pattern with Pool Discovery Service. This architectural simplification delivers:

βœ… 1000x faster pool access (in-memory <1ΞΌs vs Redis 1-2ms) βœ… 10x fresher pool data (WebSocket <1s vs Redis polling 10s) βœ… Single source of truth (no coordination between services) βœ… Simpler debugging (all pool updates in one service) βœ… Better resilience (WebSocket primary, RPC backup)

Migration Status

PhaseStatusETA
Phase 1: Pool Discovery Simplificationβœ… CompleteWeek 1
Phase 2: Local Quote Service EnhancementπŸ”„ In ProgressWeek 2
Phase 3: In-Memory Cache Priority⏳ PendingWeek 3
Phase 4: Production Migration⏳ PendingWeek 4
Phase 5: Cleanup⏳ PendingWeek 5

Key Takeaways

  1. Simplicity wins: Moving from split to single responsibility reduced complexity by 50%
  2. In-memory is king: 1000x performance improvement by eliminating Redis from hot path
  3. WebSocket-first: Real-time updates provide sub-second freshness vs 10s polling
  4. Graceful degradation: RPC backup ensures system works even if WebSocket fails
  5. Default reserves critical: Never allow nil/zero reserves to avoid premature filtering
  • 30-QUOTE-SERVICE-ARCHITECTURE.md: Main Quote Service architecture
  • 25-POOL-DISCOVERY-DESIGN.md: Updated Pool Discovery design (discovery-only)
  • README.md: Service descriptions and quick start

Document Version: 1.0 Last Updated: January 1, 2026 Status: βœ… Active Migration Maintainer: Solution Architect Team