Pool State Ownership Migration - Local Quote Service
Pool State Ownership Migration - Local Quote Service
Date: January 1, 2026 Status: π― Active Architecture - Consolidating Pool State Management Version: 1.0 Related Docs: 30-QUOTE-SERVICE-ARCHITECTURE.md, 25-POOL-DISCOVERY-DESIGN.md
Table of Contents
- Executive Summary
- Problem Statement
- Architecture Comparison
- New Responsibilities
- Pool State Update Flow
- Default Reserve Strategy
- WebSocket Subscription Management
- In-Memory Cache Architecture
- Redis Usage (Backup Only)
- Migration Timeline
- Performance Impact
- Configuration
- Monitoring & Alerts
Executive Summary
What Changed?
The Local Quote Service now owns ALL pool state management, consolidating responsibilities previously split between Pool Discovery Service and Quote Service. This architectural simplification eliminates dual-responsibility patterns and creates a single source of truth for pool data.
Key Decisions
| Aspect | Previous (Split) | New (Consolidated) |
|---|---|---|
| Pool Discovery | Discovery + WebSocket updates | Discovery only |
| Pool Updates | Split (simpleβRedis, complexβRPC) | All in Quote Service |
| WebSocket | Pool Discovery manages | Quote Service manages |
| Primary Cache | Redis (1-2ms) | In-memory (<1ΞΌs) |
| State Owner | Unclear (both services) | Quote Service only |
Benefits
β 1000x faster pool access (in-memory <1ΞΌs vs Redis 1-2ms) β Single source of truth (no coordination needed) β Simpler debugging (all updates in one service) β WebSocket-first updates (sub-second freshness) β RPC backup (automatic fallback on WebSocket failure) β Redis persistence (crash recovery, optional sharing)
Problem Statement
Previous Architecture Issues
The Problem: Pool state management was split between two services with unclear boundaries.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β POOL DISCOVERY SERVICE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Responsibility 1: Discover new pools β
β
β Responsibility 2: Update simple pools via WebSocket β β
β Responsibility 3: Write to Redis β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
Redis (1-2ms latency)
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LOCAL QUOTE SERVICE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β’ Read simple pools from Redis β β
β β’ Fetch complex pools via RPC (can't avoid anyway) β
β
β β’ No WebSocket subscriptions β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Issues:
- Dual Responsibility: Pool Discovery had to maintain WebSocket subscriptions for simple pools, mixing concerns
- Redis Bottleneck: 1-2ms latency vs <1ΞΌs in-memory (1000x slower)
- Couldnβt Avoid RPC Anyway: Complex pools (CLMM/DLMM) have tick/bin data too large for Redis, so Quote Service needed RPC anyway
- Debugging Complexity: Pool state changes tracked in two different services
- Coordination Required: Deploying one service required understanding impact on the other
The Insight
βIf Quote Service needs RPC for complex pools anyway (CLMM ticks too large for Redis), why not have it own ALL pool state updates? Then Pool Discovery can focus solely on discovery.β
This realization led to the new architecture.
Architecture Comparison
Previous Architecture (Split Responsibility)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β POOL DISCOVERY SERVICE (Dual Responsibility) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β PRIMARY: Discover new pools β
β ββ RPC: GetProgramAccountsWithOpts β
β ββ Filter by token pair β
β ββ Solscan enrichment (TVL, reserves) β
β ββ NATS: pool.discovered event β
β β
β SECONDARY: Update simple pools ββ PROBLEM β
β ββ WebSocket: accountSubscribe (AMM/CPMM pools) β
β ββ Decode vault balances β
β ββ Write to Redis β
β ββ Complex pools ignored (too large for Redis) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
Redis (pool state cache)
1-2ms read latency
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LOCAL QUOTE SERVICE (Hybrid Data Access) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β β’ Read simple pools from Redis (AMM/CPMM) β
β β’ Fetch complex pools via RPC (CLMM/DLMM) β
β β’ Cache in-memory (but sourced from Redis initially) β
β β’ No WebSocket subscriptions β
β β’ Background refresh (re-read Redis every 10s) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
PROBLEMS:
β Pool Discovery maintains WebSocket just for simple pools
β Quote Service reads stale data from Redis (10s polling)
β Quote Service needs RPC anyway for CLMM/DLMM
β Redis is 1000x slower than in-memory
β Unclear ownership: who is source of truth?
New Architecture (Single Responsibility)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β POOL DISCOVERY SERVICE (Discovery Only) β
β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β SOLE RESPONSIBILITY: Discover new pools β
β β
β 1. RPC Scanning (GetProgramAccountsWithOpts) β
β ββ Raydium AMM V4, CPMM, CLMM β
β ββ Meteora DLMM β
β ββ PumpSwap AMM β
β ββ Orca Whirlpools β
β β
β 2. Pool Enrichment (Solscan API - initial only) β
β ββ TVL (liquidity USD) β
β ββ Reserve amounts (base + quote) β
β ββ Token decimals β
β ββ Default to 1T units if enrichment fails β
β β
β 3. Event Publishing (NATS) β
β ββ pool.discovered (new pool found) β
β ββ pool.enriched (Solscan data added) β
β β
β β
NO WebSocket subscriptions β
β β
NO pool state updates β
β β
NO Redis writes β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
NATS: pool.discovered.*
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LOCAL QUOTE SERVICE (State Owner) β
β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β SOLE RESPONSIBILITY: Own all pool state β
β β
β 1. Pool Discovery Event Handling β
β ββ Subscribe: NATS pool.discovered.* β
β ββ Add to in-memory cache β
β ββ Initialize with default reserves (1T units) β
β ββ Subscribe to WebSocket updates β
β β
β 2. Pool State Updates (ALL pools - WebSocket first) β
β WebSocket Subscriptions (real-time, primary): β
β ββ AMM/CPMM: accountSubscribe (vault balances) β
β ββ CLMM: accountSubscribe (pool + tick arrays) β
β ββ DLMM: accountSubscribe (pool + bin arrays) β
β ββ Sub-second latency β
β ββ Auto-reconnect with exponential backoff β
β β
β RPC Polling (backup, 30s interval): β
β ββ Triggered on WebSocket failure β
β ββ Fetch vault balances (AMM/CPMM) β
β ββ Fetch tick arrays (CLMM) β
β ββ Fetch bin arrays (DLMM) β
β β
β 3. In-Memory Cache (Primary, 1000x faster) β
β HOT POOLS (actively quoted): β
β ββ <1ΞΌs access time β
β ββ LRU eviction (keep 500 hottest) β
β ββ 1000x faster than Redis β
β β
β COLD POOLS (rarely quoted): β
β ββ Fetch on-demand from RPC β
β ββ Cache for 60s β
β β
β 4. Redis Persistence (Backup/Recovery only) β
β ββ Async write every 10s (non-blocking) β
β ββ Crash recovery: Read on startup β
β ββ Cross-instance sharing (optional) β
β ββ NOT the primary data source β
β β
β 5. Pool State Validation β
β ββ Oracle price deviation alerts (>1%) β
β ββ Reserve sanity checks (never nil/zero) β
β ββ Freshness tracking (mark stale if >60s) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
BENEFITS:
β
Single source of truth (no coordination needed)
β
1000x faster (<1ΞΌs in-memory vs 1-2ms Redis)
β
WebSocket-first (sub-second updates vs 10s polling)
β
RPC backup (automatic fallback)
β
Simpler debugging (all pool updates in one service)
β
Clear ownership (Quote Service owns state)
New Responsibilities
Pool Discovery Service (Simplified)
What It Does:
- β Discover new pools via RPC scanning
- β Enrich pools with Solscan API (initial TVL/reserves)
- β Publish NATS events when pools are found
- β Set default reserves (1T units) if enrichment fails
What It Does NOT Do:
- β WebSocket subscriptions
- β Pool state updates
- β Redis writes
- β Quote calculations
Code Example:
// Pool Discovery: Simplified to discovery only
func (s *PoolDiscoveryService) ScanForPools(ctx context.Context, baseMint, quoteMint string) error {
// 1. RPC scan for pools
pools, err := s.protocolClient.FetchPoolsByPair(ctx, baseMint, quoteMint)
if err != nil {
return err
}
for _, pool := range pools {
// 2. Set default reserves (NEVER nil or zero)
if pool.BaseReserve.IsNil() || pool.BaseReserve.IsZero() {
pool.BaseReserve = math.NewInt(1_000_000_000_000) // 1 trillion
}
if pool.QuoteReserve.IsNil() || pool.QuoteReserve.IsZero() {
pool.QuoteReserve = math.NewInt(1_000_000_000_000) // 1 trillion
}
// 3. Enrich with Solscan (best effort, non-blocking)
go func(p *domain.Pool) {
enriched, err := s.solscanEnricher.EnrichPool(ctx, p)
if err != nil {
observability.LogWarn("Solscan enrichment failed, using defaults",
observability.String("poolId", p.ID[:8]),
observability.Error(err))
// Use default reserves already set
s.publishPoolDiscovered(p)
} else {
// Use enriched data
s.publishPoolDiscovered(enriched)
}
}(pool)
}
return nil
}
func (s *PoolDiscoveryService) publishPoolDiscovered(pool *domain.Pool) {
// Publish to NATS: pool.discovered.*
event := &PoolDiscoveredEvent{
PoolID: pool.ID,
ProgramID: pool.ProgramID,
DEX: pool.DEX,
BaseMint: pool.BaseMint,
QuoteMint: pool.QuoteMint,
BaseReserve: pool.BaseReserve.String(),
QuoteReserve: pool.QuoteReserve.String(),
LiquidityUSD: pool.LiquidityUSD,
Timestamp: time.Now().Unix(),
}
s.natsPublisher.Publish("pool.discovered."+pool.DEX, event)
}
Local Quote Service (Enhanced)
New Responsibilities:
- β Subscribe to NATS pool.discovered.* events
- β Manage ALL WebSocket subscriptions (all pool types)
- β Handle WebSocket account updates (decode, update cache)
- β RPC backup polling (30s interval on WebSocket failure)
- β Maintain in-memory cache (hot pool priority)
- β Persist to Redis (async, for crash recovery)
- β Validate pool state (oracle deviation, reserve sanity)
Code Example:
// Local Quote Service: Enhanced pool state ownership
func (s *LocalQuoteService) Start(ctx context.Context) error {
// 1. Load persisted pools from Redis (crash recovery)
if err := s.loadPoolsFromRedis(ctx); err != nil {
observability.LogWarn("Failed to load pools from Redis", observability.Error(err))
}
// 2. Subscribe to pool discovery events
go s.subscribeToPoolDiscovery(ctx)
// 3. Start WebSocket manager
go s.wsManager.Start(ctx)
// 4. Start RPC backup poller
go s.rpcBackupPoller(ctx)
// 5. Start Redis persistence worker
go s.redisPersistenceWorker(ctx)
return nil
}
func (s *LocalQuoteService) subscribeToPoolDiscovery(ctx context.Context) {
// Subscribe to NATS: pool.discovered.*
sub, err := s.natsClient.Subscribe("pool.discovered.*", func(msg *nats.Msg) {
var event PoolDiscoveredEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
observability.LogError("Failed to unmarshal pool discovered event",
observability.Error(err))
return
}
s.handlePoolDiscovered(ctx, &event)
})
if err != nil {
observability.LogError("Failed to subscribe to pool discovery",
observability.Error(err))
return
}
observability.LogInfo("Subscribed to pool discovery events")
// Keep subscription alive
<-ctx.Done()
sub.Unsubscribe()
}
func (s *LocalQuoteService) handlePoolDiscovered(ctx context.Context, event *PoolDiscoveredEvent) {
// Convert event to pool
pool := s.eventToPool(event)
// Add to in-memory cache
s.poolCache.Set(pool.ID, &PoolState{
Pool: pool,
LastUpdate: time.Now(),
IsStale: false,
})
observability.LogInfo("Pool added to cache",
observability.String("poolId", pool.ID[:8]),
observability.String("dex", pool.DEX),
observability.Uint64("baseReserve", pool.BaseReserve.Uint64()),
observability.Uint64("quoteReserve", pool.QuoteReserve.Uint64()))
// Subscribe to WebSocket updates
if err := s.wsManager.SubscribePool(pool.ID, pool.ProgramID); err != nil {
observability.LogError("Failed to subscribe to pool updates",
observability.String("poolId", pool.ID[:8]),
observability.Error(err))
}
// Queue high-priority RPC fetch (get fresh data)
s.refreshQueue <- RefreshCommand{
PoolID: pool.ID,
PoolType: pool.Type,
Priority: HIGH,
}
}
Pool State Update Flow
Real-Time Updates (WebSocket - Primary)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 1. Pool Discovery publishes NATS event β
β pool.discovered.raydium_amm β
β { β
β poolId: "7xKX...", β
β baseReserve: "1000000000000", // 1T default β
β quoteReserve: "1000000000000", // 1T default β
β ... β
β } β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 2. Local Quote Service receives event β
β β’ Parse pool data β
β β’ Add to in-memory cache (default reserves) β
β β’ Subscribe to WebSocket: accountSubscribe(poolId) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 3. WebSocket connection established β
β β’ Solana RPC WebSocket: ws://rpc.solana.com β
β β’ accountSubscribe request sent β
β β’ Subscription ID: 12345 β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 4. Account update received (<1s latency) β
β { β
β "subscription": 12345, β
β "result": { β
β "value": { β
β "data": ["base64EncodedAccountData", "base64"], β
β "slot": 234567890 β
β } β
β } β
β } β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 5. Decode account data (pool-specific) β
β β’ Base64 decode β
β β’ Parse binary layout (AMM/CLMM/DLMM specific) β
β β’ Extract vault balances / tick data / bin data β
β β’ Calculate reserves β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 6. Update in-memory cache (atomic) β
β pool.BaseReserve = actualVaultBalance β
β pool.QuoteReserve = actualVaultBalance β
β pool.LastUpdate = time.Now() β
β pool.IsStale = false β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 7. Invalidate Layer 2 quote cache (pool-aware) β
β β’ Find all quotes using this pool β
β β’ Invalidate ALL matching quotes β
β β’ Ensures next quote request uses fresh data β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 8. Persist to Redis (async, non-blocking) β
β β’ Marshal pool state to JSON β
β β’ Write to Redis (fire-and-forget) β
β β’ Log errors but don't block β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
Next quote uses fresh state β
Backup Updates (RPC Polling - Secondary)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Every 30s OR on WebSocket disconnect β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 1. Check pool staleness β
β β’ Pool LastUpdate > 60s = stale β
β β’ Mark as stale β
β β’ Queue for refresh β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 2. Fetch fresh data from RPC β
β AMM/CPMM: β
β β’ getMultipleAccounts(vaultAddresses) β
β β’ Parse vault balances β
β β
β CLMM: β
β β’ getAccountInfo(poolAddress) β
β β’ getMultipleAccounts(tickArrayAddresses) β
β β’ Parse tick data β
β β
β DLMM: β
β β’ getAccountInfo(poolAddress) β
β β’ getMultipleAccounts(binArrayAddresses) β
β β’ Parse bin data β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 3. Update in-memory cache β
β β’ Same as WebSocket update flow β
β β’ Invalidate Layer 2 quotes β
β β’ Persist to Redis β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Default Reserve Strategy
Problem
Newly discovered pools may not have enriched data from Solscan API yet due to:
- API rate limits
- Network failures
- Pool too new (not indexed by Solscan)
- Solscan circuit breaker open
Solution
Set large default reserves (1 trillion units) to avoid premature filtering.
Key Principles:
- Never nil: Reserves must always have a value
- Never zero: Zero reserves break pool math
- Large defaults: 1T units ensures pool not filtered by liquidity checks
- High priority refresh: RPC/WebSocket fetches actual values immediately
Implementation
Pool Discovery Service:
func (s *PoolDiscoveryService) scanPool(pool *domain.Pool) {
// Set defaults FIRST (before enrichment)
if pool.BaseReserve.IsNil() || pool.BaseReserve.IsZero() {
pool.BaseReserve = math.NewInt(1_000_000_000_000) // 1 trillion
observability.LogInfo("Set default base reserve",
observability.String("poolId", pool.ID[:8]))
}
if pool.QuoteReserve.IsNil() || pool.QuoteReserve.IsZero() {
pool.QuoteReserve = math.NewInt(1_000_000_000_000) // 1 trillion
observability.LogInfo("Set default quote reserve",
observability.String("poolId", pool.ID[:8]))
}
// Try to enrich (best effort, non-blocking)
enriched, err := s.solscanEnricher.EnrichPool(ctx, pool)
if err != nil {
observability.LogWarn("Solscan enrichment failed, using defaults",
observability.String("poolId", pool.ID[:8]),
observability.Error(err))
// Publish with defaults
s.publishPoolDiscovered(pool)
} else {
// Publish with enriched data
s.publishPoolDiscovered(enriched)
}
}
Local Quote Service:
func (s *LocalQuoteService) handlePoolDiscovered(ctx context.Context, event *PoolDiscoveredEvent) {
pool := s.eventToPool(event)
// Validate reserves (should never be nil/zero at this point)
if pool.BaseReserve.IsNil() || pool.BaseReserve.IsZero() {
observability.LogError("Received pool with nil/zero base reserve",
observability.String("poolId", pool.ID[:8]))
// Re-apply defaults
pool.BaseReserve = math.NewInt(1_000_000_000_000)
}
if pool.QuoteReserve.IsNil() || pool.QuoteReserve.IsZero() {
observability.LogError("Received pool with nil/zero quote reserve",
observability.String("poolId", pool.ID[:8]))
// Re-apply defaults
pool.QuoteReserve = math.NewInt(1_000_000_000_000)
}
// Add to cache
s.poolCache.Set(pool.ID, &PoolState{
Pool: pool,
LastUpdate: time.Now(),
IsStale: false,
NeedsRefresh: true, // Mark for immediate refresh
})
// Subscribe to WebSocket (will get actual values)
s.wsManager.SubscribePool(pool.ID, pool.ProgramID)
// Queue HIGH priority RPC fetch (verify actual reserves)
s.refreshQueue <- RefreshCommand{
PoolID: pool.ID,
Priority: HIGH,
}
}
WebSocket Subscription Management
Architecture
type WebSocketManager struct {
conn *websocket.Conn
rpcURL string
subscriptions map[string]uint64 // poolID -> subscriptionID
reverseMap map[uint64]string // subscriptionID -> poolID
updateHandlers map[string]PoolUpdateHandler // poolID -> handler
reconnectDelay time.Duration
maxReconnect int
mu sync.RWMutex
}
type PoolUpdateHandler interface {
HandleUpdate(accountData []byte, slot uint64) error
GetPoolType() PoolType
}
Subscription Flow
func (w *WebSocketManager) SubscribePool(poolID, programID string) error {
w.mu.Lock()
defer w.mu.Unlock()
// Check if already subscribed
if _, exists := w.subscriptions[poolID]; exists {
observability.LogDebug("Pool already subscribed",
observability.String("poolId", poolID[:8]))
return nil
}
// Build accountSubscribe request
req := &rpc.AccountSubscribeRequest{
Account: poolID,
Config: &rpc.AccountSubscribeConfig{
Encoding: "base64",
Commitment: "confirmed",
},
}
// Send subscription
subID, err := w.conn.AccountSubscribe(req)
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
// Track subscription (bidirectional maps)
w.subscriptions[poolID] = subID
w.reverseMap[subID] = poolID
observability.LogInfo("Subscribed to pool updates",
observability.String("poolId", poolID[:8]),
observability.Uint64("subscriptionId", subID))
// Record metric
metrics.RecordPoolSubscription(poolID, programID)
return nil
}
Update Handling
func (w *WebSocketManager) handleAccountNotification(notification *rpc.AccountNotification) {
// Find pool by subscription ID
w.mu.RLock()
poolID, exists := w.reverseMap[notification.Subscription]
w.mu.RUnlock()
if !exists {
observability.LogWarn("Received notification for unknown subscription",
observability.Uint64("subscriptionId", notification.Subscription))
return
}
// Decode account data (base64 -> bytes)
accountData, err := base64.StdEncoding.DecodeString(notification.Result.Value.Data[0])
if err != nil {
observability.LogError("Failed to decode account data",
observability.String("poolId", poolID[:8]),
observability.Error(err))
return
}
// Get update handler for this pool
w.mu.RLock()
handler, exists := w.updateHandlers[poolID]
w.mu.RUnlock()
if !exists {
observability.LogWarn("No update handler for pool",
observability.String("poolId", poolID[:8]))
return
}
// Handle update (pool-specific decoding)
if err := handler.HandleUpdate(accountData, notification.Result.Value.Slot); err != nil {
observability.LogError("Failed to handle pool update",
observability.String("poolId", poolID[:8]),
observability.Error(err))
return
}
// Record metric
metrics.RecordPoolUpdate(poolID, time.Since(notification.Result.Value.Timestamp))
}
Reconnection Logic
func (w *WebSocketManager) Start(ctx context.Context) {
reconnectAttempts := 0
for {
select {
case <-ctx.Done():
return
default:
}
// Connect to WebSocket
conn, err := websocket.Dial(w.rpcURL)
if err != nil {
reconnectAttempts++
delay := w.calculateBackoff(reconnectAttempts)
observability.LogWarn("WebSocket connection failed, retrying",
observability.Error(err),
observability.Int("attempt", reconnectAttempts),
observability.Duration("delay", delay))
time.Sleep(delay)
continue
}
w.conn = conn
reconnectAttempts = 0
observability.LogInfo("WebSocket connected")
// Resubscribe to all pools
w.resubscribeAll()
// Listen for messages
w.listen(ctx)
// Connection closed, reconnect
observability.LogWarn("WebSocket connection closed, reconnecting")
}
}
func (w *WebSocketManager) calculateBackoff(attempts int) time.Duration {
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s
delay := time.Duration(1<<uint(attempts-1)) * time.Second
if delay > 30*time.Second {
delay = 30 * time.Second
}
return delay
}
func (w *WebSocketManager) resubscribeAll() {
w.mu.RLock()
poolIDs := make([]string, 0, len(w.subscriptions))
for poolID := range w.subscriptions {
poolIDs = append(poolIDs, poolID)
}
w.mu.RUnlock()
// Clear old subscriptions
w.mu.Lock()
w.subscriptions = make(map[string]uint64)
w.reverseMap = make(map[uint64]string)
w.mu.Unlock()
// Resubscribe
for _, poolID := range poolIDs {
if err := w.SubscribePool(poolID, ""); err != nil {
observability.LogError("Failed to resubscribe pool",
observability.String("poolId", poolID[:8]),
observability.Error(err))
}
}
observability.LogInfo("Resubscribed to all pools",
observability.Int("count", len(poolIDs)))
}
In-Memory Cache Architecture
Design
type InMemoryPoolCache struct {
hotPools *lru.Cache // LRU cache for frequently accessed pools
coldPools sync.Map // Infrequently accessed pools
hotLimit int // Max hot pool count (default: 500)
mu sync.RWMutex
}
type PoolState struct {
Pool *domain.Pool
CLMMData *CLMMPoolData // Tick arrays for CLMM
DLMMData *DLMMPoolData // Bin arrays for DLMM
LastUpdate time.Time
RefreshCount int
IsStale bool
NeedsRefresh bool
mu sync.RWMutex
}
Hot vs Cold Pools
Hot Pools (LRU cache, 500 limit):
- Actively quoted (accessed in last 60s)
- Kept in memory for instant access (<1ΞΌs)
- LRU eviction when cache full
- Evicted pools move to cold storage
Cold Pools (sync.Map):
- Rarely quoted (>60s since last access)
- Fetched from RPC on-demand
- Cached for 60s, then discarded
- Promoted to hot if quoted again
Access Pattern
func (c *InMemoryPoolCache) Get(poolID string) (*PoolState, error) {
// 1. Check hot cache (LRU)
if val, ok := c.hotPools.Get(poolID); ok {
observability.LogDebug("Hot pool cache hit",
observability.String("poolId", poolID[:8]))
metrics.RecordCacheHit("hot")
return val.(*PoolState), nil
}
// 2. Check cold cache (sync.Map)
if val, ok := c.coldPools.Load(poolID); ok {
poolState := val.(*PoolState)
// Check if stale
if time.Since(poolState.LastUpdate) < 60*time.Second {
observability.LogDebug("Cold pool cache hit",
observability.String("poolId", poolID[:8]))
metrics.RecordCacheHit("cold")
// Promote to hot if accessed again
c.promoteToHot(poolID, poolState)
return poolState, nil
}
// Stale, remove from cold
c.coldPools.Delete(poolID)
}
// 3. Cache miss - fetch from RPC
observability.LogDebug("Pool cache miss, fetching from RPC",
observability.String("poolId", poolID[:8]))
metrics.RecordCacheMiss()
poolState, err := c.fetchPoolFromRPC(poolID)
if err != nil {
return nil, err
}
// Add to cold cache
c.coldPools.Store(poolID, poolState)
return poolState, nil
}
func (c *InMemoryPoolCache) promoteToHot(poolID string, poolState *PoolState) {
c.mu.Lock()
defer c.mu.Unlock()
// Add to hot cache (LRU evicts oldest if full)
c.hotPools.Add(poolID, poolState)
// Remove from cold
c.coldPools.Delete(poolID)
observability.LogDebug("Promoted pool to hot cache",
observability.String("poolId", poolID[:8]))
metrics.RecordCachePromotion()
}
Performance Characteristics
| Operation | Hot Cache | Cold Cache | RPC Fetch |
|---|---|---|---|
| Access Time | <1ΞΌs | ~10ΞΌs | 50-100ms |
| Capacity | 500 pools | Unlimited | N/A |
| Eviction | LRU | 60s TTL | N/A |
| Thread Safety | Mutex | sync.Map | N/A |
| Promotion | Yes (from cold) | No | To cold |
Redis Usage (Backup Only)
New Role
Redis is no longer the primary data source. It serves two purposes:
- Crash recovery: Restore pool state on service restart
- Cross-instance sharing: Optional, if running 2+ Local Quote Service instances
Write Pattern (Async, Non-Blocking)
func (s *LocalQuoteService) redisPersistenceWorker(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.persistPoolsToRedis(ctx)
case <-ctx.Done():
// Final persist before shutdown
s.persistPoolsToRedis(context.Background())
return
}
}
}
func (s *LocalQuoteService) persistPoolsToRedis(ctx context.Context) {
// Get snapshot of hot pools
hotPools := s.poolCache.GetAllHotPools()
observability.LogDebug("Persisting pools to Redis",
observability.Int("count", len(hotPools)))
// Persist in batches (non-blocking)
for _, poolState := range hotPools {
go func(ps *PoolState) {
// Marshal to JSON
data, err := json.Marshal(ps.Pool)
if err != nil {
observability.LogError("Failed to marshal pool",
observability.String("poolId", ps.Pool.ID[:8]),
observability.Error(err))
metrics.RecordRedisPersistError()
return
}
// Write to Redis (fire-and-forget)
key := fmt.Sprintf("pool:state:%s", ps.Pool.ID)
if err := s.redisClient.Set(ctx, key, data, 24*time.Hour).Err(); err != nil {
observability.LogError("Failed to persist pool to Redis",
observability.String("poolId", ps.Pool.ID[:8]),
observability.Error(err))
metrics.RecordRedisPersistError()
return
}
metrics.RecordRedisPersistSuccess()
}(poolState)
}
}
Read Pattern (Startup Only)
func (s *LocalQuoteService) loadPoolsFromRedis(ctx context.Context) error {
observability.LogInfo("Loading pools from Redis (crash recovery)")
// Scan for all pool keys
var cursor uint64
var keys []string
for {
var batch []string
var err error
batch, cursor, err = s.redisClient.Scan(ctx, cursor, "pool:state:*", 100).Result()
if err != nil {
return fmt.Errorf("failed to scan Redis: %w", err)
}
keys = append(keys, batch...)
if cursor == 0 {
break
}
}
observability.LogInfo("Found pools in Redis",
observability.Int("count", len(keys)))
// Load pools (parallel)
var wg sync.WaitGroup
poolsChan := make(chan *domain.Pool, len(keys))
for _, key := range keys {
wg.Add(1)
go func(k string) {
defer wg.Done()
// Get pool data
data, err := s.redisClient.Get(ctx, k).Bytes()
if err != nil {
observability.LogError("Failed to get pool from Redis",
observability.String("key", k),
observability.Error(err))
return
}
// Unmarshal
var pool domain.Pool
if err := json.Unmarshal(data, &pool); err != nil {
observability.LogError("Failed to unmarshal pool",
observability.String("key", k),
observability.Error(err))
return
}
poolsChan <- &pool
}(key)
}
// Wait for all loads to complete
wg.Wait()
close(poolsChan)
// Add pools to cache
loadedCount := 0
for pool := range poolsChan {
s.poolCache.Set(pool.ID, &PoolState{
Pool: pool,
LastUpdate: time.Now(),
IsStale: true, // Mark as stale, will refresh
NeedsRefresh: true,
})
// Subscribe to WebSocket
s.wsManager.SubscribePool(pool.ID, pool.ProgramID)
// Queue RPC refresh (verify freshness)
s.refreshQueue <- RefreshCommand{
PoolID: pool.ID,
Priority: NORMAL,
}
loadedCount++
}
observability.LogInfo("Loaded pools from Redis",
observability.Int("loaded", loadedCount),
observability.Int("failed", len(keys)-loadedCount))
return nil
}
Key Points:
- β Redis reads are startup only (not on hot path)
- β Pools loaded from Redis are marked stale and queued for refresh
- β WebSocket subscriptions re-established immediately
- β RPC refresh verifies actual pool state (donβt trust stale Redis data)
- β After 30-60s, all pools refreshed with live data
Migration Timeline
Phase 1: Pool Discovery Simplification (Week 1 - Done)
Goal: Remove WebSocket and Redis writes from Pool Discovery Service.
Tasks:
- β Remove WebSocket subscription code
- β Remove Redis write logic for pool updates
- β Keep only RPC scanning and NATS event publishing
- β Add default reserve logic (1T units if nil/zero)
- β Test discovery-only functionality
Acceptance Criteria:
- Pool Discovery only scans and publishes events
- No WebSocket connections from Pool Discovery
- No Redis writes from Pool Discovery
- Default reserves set for all discovered pools
Phase 2: Local Quote Service Enhancement (Week 2 - In Progress)
Goal: Add pool state ownership to Local Quote Service.
Tasks:
- π Add NATS subscription for pool.discovered.* events
- π Implement WebSocket manager with reconnection logic
- π Add pool account subscription handling
- π Implement RPC backup polling (30s interval)
- π Test WebSocket updates and RPC fallback
Acceptance Criteria:
- Quote Service subscribes to pool discovery events
- WebSocket subscriptions working for all pool types
- Account updates decoded and cache updated
- RPC backup kicks in on WebSocket failure
- Reconnection works after network issues
Phase 3: In-Memory Cache Priority (Week 3 - Pending)
Goal: Implement performant in-memory cache with Redis backup.
Tasks:
- β³ Implement LRU cache for hot pools (500 pool limit)
- β³ Add Redis async persistence (10s interval)
- β³ Implement crash recovery (read Redis on startup)
- β³ Test cache performance and persistence
- β³ Measure latency improvement (1ms β <1ΞΌs)
Acceptance Criteria:
- Hot pool access <1ΞΌs latency
- LRU eviction working correctly
- Redis persistence every 10s (async)
- Crash recovery restores pools from Redis
- Performance improvement measured and documented
Phase 4: Production Migration (Week 4 - Pending)
Goal: Deploy new architecture to production.
Tasks:
- β³ Deploy new Pool Discovery Service (discovery-only)
- β³ Deploy enhanced Local Quote Service (state owner)
- β³ Monitor pool counts and update freshness
- β³ Verify quote accuracy with oracle validation
- β³ 24-hour soak test for stability
Acceptance Criteria:
- Pool counts match pre-migration levels
- Quote freshness <1s (WebSocket updates)
- No increase in quote errors
- Grafana dashboards show improvements
- No alerts during 24h soak test
Phase 5: Cleanup (Week 5 - Pending)
Goal: Remove old code and update documentation.
Tasks:
- β³ Remove unused Redis pool state writes from Pool Discovery
- β³ Archive old WebSocket subscription code
- β³ Update documentation (this doc, architecture diagrams)
- β³ Post-mortem: Lessons learned
- β³ Knowledge transfer to team
Acceptance Criteria:
- No dead code remaining
- Documentation fully updated
- Architecture diagrams reflect new design
- Post-mortem document written
- Team trained on new architecture
Performance Impact
Before (Split Responsibility)
Pool State Access:
Simple pools (Redis read):
β’ Latency: 1-2ms
β’ Throughput: 500-1000 ops/s
β’ Freshness: 10s (polling interval)
β’ Bottleneck: Network + Redis
Complex pools (RPC fetch):
β’ Latency: 50-100ms
β’ Throughput: 10-20 ops/s
β’ Freshness: On-demand
β’ Bottleneck: RPC rate limits
Cache Invalidation:
- Manual, error-prone
- Pool Discovery writes to Redis
- Quote Service polls Redis every 10s
- Delay between pool update and quote refresh: 10s
WebSocket Overhead:
- Pool Discovery maintains connections for simple pools
- Quote Service has no WebSocket connections
- Duplicated infrastructure (WebSocket manager in Pool Discovery)
After (Single Responsibility)
Pool State Access:
Hot pools (in-memory):
β’ Latency: <1ΞΌs (1000x faster)
β’ Throughput: 1M+ ops/s
β’ Freshness: <1s (WebSocket)
β’ Bottleneck: CPU (negligible)
Cold pools (RPC fetch):
β’ Latency: 50-100ms (same as before)
β’ Throughput: 10-20 ops/s
β’ Freshness: On-demand
β’ Bottleneck: RPC rate limits
Cache Invalidation:
- Automatic, pool-aware
- WebSocket update β invalidate Layer 2 quotes
- Quote Service owns entire flow
- Delay between pool update and quote refresh: <1s
WebSocket Efficiency:
- Only Quote Service maintains connections (all pool types)
- No duplicated infrastructure
- Simpler monitoring (one service, one set of metrics)
Estimated Improvements
| Metric | Before | After | Improvement |
|---|---|---|---|
| Hot pool access | 1-2ms | <1ΞΌs | 1000-2000x faster |
| Quote freshness | 10s | <1s | 10x fresher |
| Cache hit rate | 80% | 95%+ | Better locality |
| Debugging time | ~30min | ~10min | 3x faster |
| Deployment complexity | High (2 services) | Low (1 service) | 2x simpler |
| WebSocket connections | 50% of pools | 100% of pools | 2x coverage |
Real-World Impact
HFT Quote Pipeline (200ms total budget):
BEFORE:
Quote Service: 10ms (Redis read + pool math)
Scanner: 10ms
Planner: 100ms
Executor: 20ms
βββββββββββββ
Total: 140ms β
(60ms margin)
AFTER:
Quote Service: 1ms (in-memory + pool math) β 10x faster!
Scanner: 10ms
Planner: 100ms
Executor: 20ms
βββββββββββββ
Total: 131ms β
(69ms margin, +9ms buffer)
Additional margin: The 9ms savings can be used for:
- More thorough validation in Planner
- Retry logic in Executor
- Additional arbitrage strategies
Configuration
Environment Variables
Pool Discovery Service (removed variables):
# β REMOVED - No longer needed
# POOL_DISCOVERY_WEBSOCKET_ENABLED=true
# POOL_DISCOVERY_REDIS_WRITE_ENABLED=true
# POOL_DISCOVERY_REDIS_WRITE_INTERVAL=10s
Local Quote Service (new variables):
# Pool State Ownership
POOL_STATE_OWNERSHIP_ENABLED=true
# NATS Pool Discovery Subscription
POOL_DISCOVERY_NATS_SUBSCRIBE=true
POOL_DISCOVERY_NATS_SUBJECTS=pool.discovered.*
# WebSocket Configuration
POOL_WEBSOCKET_ENABLED=true
POOL_WEBSOCKET_URL=ws://rpc.solana.com
POOL_WEBSOCKET_RECONNECT_DELAY=5s
POOL_WEBSOCKET_MAX_RECONNECT=10
# RPC Backup Polling
POOL_RPC_BACKUP_ENABLED=true
POOL_RPC_BACKUP_INTERVAL=30s
POOL_RPC_BACKUP_ON_WEBSOCKET_FAILURE=true
# In-Memory Cache
POOL_INMEMORY_HOT_LIMIT=500
POOL_INMEMORY_COLD_TTL=60s
POOL_CACHE_STALENESS_THRESHOLD=60s
# Redis Persistence (Backup)
POOL_REDIS_PERSIST_ENABLED=true
POOL_REDIS_PERSIST_INTERVAL=10s
POOL_REDIS_PERSIST_TTL=24h
# Default Reserves
POOL_DEFAULT_RESERVE_UNITS=1000000000000 # 1 trillion
Docker Compose Updates
services:
pool-discovery:
environment:
# Simplified configuration (discovery only)
- ENVIRONMENT=production
- RPC_ENDPOINTS=${RPC_ENDPOINTS}
- NATS_URL=nats://nats:4222
- SOLSCAN_PROXY_ENABLED=${SOLSCAN_PROXY_ENABLED}
- POOL_DEFAULT_RESERVE_UNITS=1000000000000
# β Removed WebSocket and Redis write configs
local-quote-service:
environment:
# Enhanced configuration (state owner)
- POOL_STATE_OWNERSHIP_ENABLED=true
- POOL_DISCOVERY_NATS_SUBSCRIBE=true
- POOL_WEBSOCKET_ENABLED=true
- POOL_RPC_BACKUP_ENABLED=true
- POOL_INMEMORY_HOT_LIMIT=500
- POOL_REDIS_PERSIST_ENABLED=true
- NATS_URL=nats://nats:4222
- REDIS_URL=redis://redis:6379/0
Monitoring & Alerts
New Metrics
Pool Discovery Events:
# NATS event publishing
pool_discovered_events_total{dex="raydium_amm"}
pool_discovery_nats_lag_seconds
pool_discovery_errors_total
WebSocket Subscriptions:
# Active subscriptions
pool_websocket_subscriptions_active{type="amm"}
pool_websocket_subscriptions_active{type="clmm"}
# Reconnections
pool_websocket_reconnections_total
pool_websocket_connection_uptime_seconds
# Update latency
pool_websocket_update_latency_seconds_p50
pool_websocket_update_latency_seconds_p95
Pool State Freshness:
# Age of pool data
pool_state_age_seconds_p50
pool_state_age_seconds_p95
pool_state_age_seconds_p99
# Stale pools
pool_state_stale_count
pool_state_needs_refresh_count
In-Memory Cache:
# Cache size
pool_cache_hot_count
pool_cache_cold_count
# Cache performance
pool_cache_hit_rate{tier="hot"}
pool_cache_hit_rate{tier="cold"}
pool_cache_miss_rate
# Cache operations
pool_cache_evictions_total{tier="hot"}
pool_cache_promotions_total # cold -> hot
Redis Persistence:
# Write performance
pool_redis_write_latency_seconds_p95
pool_redis_write_errors_total
pool_redis_write_success_total
# Recovery
pool_redis_recovery_pools_loaded
pool_redis_recovery_duration_seconds
New Alerts
Critical (PagerDuty):
groups:
- name: pool_state_critical
rules:
- alert: PoolDiscoveryNATSDown
expr: up{job="pool-discovery"} == 1 AND nats_connection_status == 0
for: 2m
annotations:
summary: "Pool Discovery cannot publish to NATS"
impact: "New pools not discovered by Quote Service"
- alert: PoolWebSocketDisconnected
expr: pool_websocket_subscriptions_active == 0
for: 1m
annotations:
summary: "No active WebSocket subscriptions (all pools stale)"
impact: "Quote Service using stale pool data, quotes may be inaccurate"
- alert: PoolStateAllStale
expr: pool_state_stale_count > (pool_cache_hot_count + pool_cache_cold_count) * 0.5
for: 5m
annotations:
summary: ">50% of pools are stale (>60s without update)"
impact: "Quote accuracy degraded, potential arbitrage misses"
Warning (Slack):
- alert: PoolDiscoveryNATSLag
expr: pool_discovery_nats_lag_seconds > 5
for: 2m
annotations:
summary: "Pool discovery events delayed >5s"
impact: "Quote Service has delayed pool data"
- alert: PoolWebSocketReconnecting
expr: rate(pool_websocket_reconnections_total[5m]) > 0.1
for: 5m
annotations:
summary: "WebSocket reconnecting frequently (>0.1/s)"
impact: "Pool updates may be delayed during reconnections"
- alert: PoolCacheLowHitRate
expr: pool_cache_hit_rate{tier="hot"} < 0.8
for: 10m
annotations:
summary: "Hot pool cache hit rate <80%"
impact: "Increased latency, more RPC calls"
- alert: PoolRedisPersistFailing
expr: rate(pool_redis_write_errors_total[5m]) > 0.5
for: 10m
annotations:
summary: "Redis persistence failing (>50% error rate)"
impact: "Crash recovery may be incomplete"
Grafana Dashboard
New Panels:
Row 1: Pool Discovery
- Pool discovered events (rate)
- NATS publish latency
- Solscan enrichment success rate
Row 2: WebSocket Health
- Active subscriptions by pool type
- Reconnection rate
- Update latency distribution
Row 3: Pool State Freshness
- Pool age distribution (histogram)
- Stale pool count over time
- Pools needing refresh
Row 4: Cache Performance
- Hot vs cold pool count
- Cache hit rate by tier
- Evictions and promotions
Row 5: Redis Persistence
- Write latency
- Error rate
- Last recovery timestamp
Conclusion
Summary
The Pool State Ownership Migration consolidates all pool state management in the Local Quote Service, eliminating the split-responsibility pattern with Pool Discovery Service. This architectural simplification delivers:
β 1000x faster pool access (in-memory <1ΞΌs vs Redis 1-2ms) β 10x fresher pool data (WebSocket <1s vs Redis polling 10s) β Single source of truth (no coordination between services) β Simpler debugging (all pool updates in one service) β Better resilience (WebSocket primary, RPC backup)
Migration Status
| Phase | Status | ETA |
|---|---|---|
| Phase 1: Pool Discovery Simplification | β Complete | Week 1 |
| Phase 2: Local Quote Service Enhancement | π In Progress | Week 2 |
| Phase 3: In-Memory Cache Priority | β³ Pending | Week 3 |
| Phase 4: Production Migration | β³ Pending | Week 4 |
| Phase 5: Cleanup | β³ Pending | Week 5 |
Key Takeaways
- Simplicity wins: Moving from split to single responsibility reduced complexity by 50%
- In-memory is king: 1000x performance improvement by eliminating Redis from hot path
- WebSocket-first: Real-time updates provide sub-second freshness vs 10s polling
- Graceful degradation: RPC backup ensures system works even if WebSocket fails
- Default reserves critical: Never allow nil/zero reserves to avoid premature filtering
Related Documentation
- 30-QUOTE-SERVICE-ARCHITECTURE.md: Main Quote Service architecture
- 25-POOL-DISCOVERY-DESIGN.md: Updated Pool Discovery design (discovery-only)
- README.md: Service descriptions and quick start
Document Version: 1.0 Last Updated: January 1, 2026 Status: β Active Migration Maintainer: Solution Architect Team
