Quote Service Architecture Optimization Review

Quote Service Architecture Optimization Review

Date: January 17-18, 2026 Status: Phase 1-3 Complete, Phase 4 In Progress Version: 1.1 Reviewer: Solution Architect + Senior Go Developer


Table of Contents

  1. Executive Summary
  2. Services Reviewed
  3. Critical Issues
  4. Medium Priority Issues
  5. Well-Implemented Components
  6. Optimization Roadmap
  7. NATS Integration Specification
  8. Testing Checklist

Executive Summary

Review Scope

Comprehensive architecture review of Go quote services against the canonical architecture document (docs/30-QUOTE-SERVICE-ARCHITECTURE.md). Focus areas:

  • Service implementation completeness
  • gRPC streaming functionality
  • NATS integration status
  • Code quality and optimization opportunities

Key Finding

The gRPC quote streaming is implemented but NOT connected to NATS messaging.

This is the primary blocker for HFT pipeline integration. Scanners expect quote updates via NATS (market.swap_route.*) but gRPC servers only stream to direct gRPC clients.

Completion Status

ServiceImplementationNATS IntegrationTest Ready
local-quote-service95%✅ CompleteYes
external-quote-service90%✅ CompleteYes
pool-discovery-service100%✅ CompleteYes
event-logger-service100%✅ Complete (JetStream)Yes

Progress Update (January 18, 2026)

Completed Today:

  • ✅ Phase 1: NATS Integration - Quote publishers now emit SwapRouteEvent
  • ✅ Phase 2: Cache population implemented
  • ✅ Phase 3.4: Shared config loader created
  • ✅ Phase 4.1-4.3: Resilience improvements
  • Critical Fix: event-logger-service now uses JetStream subscriptions

Key Issue Resolved: SwapRouteEvent was not appearing on Grafana dashboard because:

  • Quote services publish via JetStream (js.PublishAsync())
  • event-logger-service was subscribing via core NATS (nc.Subscribe())
  • JetStream messages require proper stream binding to be received

Fix Applied: Updated event-logger-service to use JetStream consumers with proper stream binding.


Services Reviewed

1. Local Quote Service

Location: go/cmd/local-quote-service/main.go (1088 lines)

Architecture:

  • Dual cache: Pool state (Layer 1) + Quote response (Layer 2)
  • Background refresh: AMM 1s, CLMM 30s
  • WebSocket integration for real-time pool updates
  • Quality management with oracle validation

gRPC Server: go/internal/local-quote-service/grpc/server.go (418 lines)

Key Features Implemented:

  • ✅ Parallel paired quote calculation (forward + reverse)
  • ✅ Cache lookup with fresh calculation fallback
  • ✅ Keep-alive configuration for streaming
  • ✅ Context cancellation handling
  • ❌ NATS event publishing (NOT IMPLEMENTED)

2. External Quote Service

Location: go/cmd/external-quote-service/main.go (509 lines)

Architecture:

  • Split cache: Route topology + Price data
  • Multi-provider: Jupiter, DFlow, OKX, dark pools
  • Rate limiting per provider (Jupiter: 1 RPS shared)
  • Circuit breakers (planned)

gRPC Server: go/internal/external-quote-service/grpc/server.go (527 lines)

Key Features Implemented:

  • ✅ QuoterManager integration
  • ✅ Parallel paired quote calculation
  • ✅ Provider rotation and best quote selection
  • ⚠️ Cache population NOT implemented (fetches fresh every time)
  • ❌ NATS event publishing (NOT IMPLEMENTED)

3. Pool Discovery Service

Location: go/cmd/pool-discovery-service/main.go (514 lines)

Architecture:

  • Multi-protocol scanner (Raydium, Meteora, PumpSwap, etc.)
  • Redis storage with TTL
  • Solscan enrichment with rate limiting
  • NATS event publishing (working)

Key Features Implemented:

  • ✅ Full NATS integration via events.NewEventPublisher
  • ✅ Oracle manager (Pyth + Jupiter)
  • ✅ Crash recovery from Redis cache
  • ✅ Prometheus metrics

4. Event Logger Service

Location: go/cmd/event-logger-service/main.go + jetstream.go

Architecture:

  • 6-stream JetStream subscription (MARKET_DATA, OPPORTUNITIES, PLANNED, EXECUTED, METRICS, SYSTEM)
  • FlatBuffers event handling
  • Prometheus metrics
  • Durable consumers with explicit acknowledgment

Key Files:

  • main.go - Service entry point, JetStream context creation
  • jetstream.go - JetStream subscription helpers, consumer configurations
  • flatbuffers_handler.go - FlatBuffers event deserialization

Consumer Configuration (9 durable consumers): | Stream | Consumer | Filter Subject | |——–|———-|—————-| | MARKET_DATA | event-logger-market-data | market.> | | OPPORTUNITIES | event-logger-opportunities | opportunity.> | | PLANNED | event-logger-planned | execution.plan.> | | PLANNED | event-logger-rejected | execution.rejected.> | | EXECUTED | event-logger-started | execution.started.> | | EXECUTED | event-logger-results | execution.result.> | | EXECUTED | event-logger-failed | execution.failed.> | | METRICS | event-logger-metrics | metrics.> | | SYSTEM | event-logger-system | system.> |

Important: Streams are created by system-initializer service (TypeScript). Event-logger-service only subscribes to existing streams.

Status: ✅ Complete and working with JetStream


Critical Issues

Issue 1: NATS Not Connected to gRPC Streaming

Severity: CRITICAL (Blocks HFT pipeline)

Architecture Requirement (from docs/30-QUOTE-SERVICE-ARCHITECTURE.md:316-319):

**NATS Messaging** (Required):
- **Quote-Service** publishes to `market.swap_route.*` for HFT pipeline integration
- **Scanner** consumes from NATS for real-time opportunity detection
- **Critical Path**: Required for Scanner → Planner → Executor pipeline

Current State:

gRPC Server                    NATS
    │                            │
    ├─► Stream to gRPC Client    │  (working)
    │                            │
    ╳─► Publish to NATS          │  (NOT IMPLEMENTED)

Affected Files:

  • go/internal/local-quote-service/grpc/server.go:249 - After safeSend()
  • go/internal/external-quote-service/grpc/server.go:267 - After safeSend()

Impact:

  • Scanner cannot receive quote updates
  • HFT pipeline broken at first step
  • Only direct gRPC clients receive quotes

Fix Location: See NATS Integration Specification


Issue 2: Unused Subscribers Map (Memory Leak)

Severity: MEDIUM

Location:

  • go/internal/local-quote-service/grpc/server.go:33,48,88-97
  • go/internal/external-quote-service/grpc/server.go:35,48,88-97

Code Pattern:

type Server struct {
    subscribers map[string]chan *proto.QuoteStreamResponse  // Line 33
}

// In StreamQuotes():
s.subscribers[subscriberID] = quoteChan  // Line 89 - Added
// ...
delete(s.subscribers, subscriberID)       // Line 95 - Deleted on disconnect

Problem:

  • Map created but never used for actual functionality
  • Only purpose is tracking for cleanup (already handled by defer)
  • Memory leak if goroutines hang before reaching delete

Recommendation:

  1. Remove if not needed for broadcast
  2. Use for subscriber management/metrics if kept

Issue 3: Goroutine Lifecycle Not Tracked

Severity: MEDIUM

Location: Both gRPC servers at startContinuousStreaming()

  • Local: server.go:133-138
  • External: server.go:133-138

Code Pattern:

for _, pair := range pairs {
    for _, amount := range amounts {
        go s.streamQuotePairContinuously(ctx, pair, amount, slippageBps, quoteChan)
    }
}

Problem:

  • 45 pairs × 40 amounts = 1,800 goroutines per client
  • Each spawns 2 sub-goroutines = 5,400 total untracked
  • No sync.WaitGroup for graceful shutdown
  • No way to verify all goroutines terminated

Fix:

var wg sync.WaitGroup
for _, pair := range pairs {
    for _, amount := range amounts {
        wg.Add(1)
        go func(p *proto.TokenPair, amt uint64) {
            defer wg.Done()
            s.streamQuotePairContinuously(ctx, p, amt, slippageBps, quoteChan)
        }(pair, amount)
    }
}
// Store wg for graceful shutdown

Issue 4: External Quote Cache Not Populated

Severity: MEDIUM

Location: go/internal/external-quote-service/grpc/server.go:261-263

Code Comment:

// Note: Split cache storage (route + price) will be implemented in Phase 3
// For now, quotes are fetched fresh from external APIs each time
// This ensures freshest data but uses more API calls

Problem:

  • Route cache (RouteCache) exists but unused
  • Price cache (PriceCache) exists but unused
  • Every request hits external API (wastes rate limit)
  • Jupiter: 1 RPS limit exhausted quickly

Impact: Under load, rate limit exhaustion causes quote failures

Fix Required:

// After getting bestQuote, store in caches:
s.routeCache.Set(routeKey, &cache.Route{...})
s.priceCache.Set(priceKey, &cache.PriceData{...})

Medium Priority Issues

Issue 5: safeSend Race Condition

Location: Both servers at safeSend() function

Pattern:

select {
case ch <- msg:
    return true
default:
    select {
    case ch <- msg:  // Race: channel could close between checks
        return true
    case <-time.After(100 * time.Millisecond):
        return false
    }
}

Problems:

  1. Channel can close between default and nested select
  2. Panic recovery catches crash but loses data silently
  3. No metrics for buffer pressure

Recommendation: Add channel state check or use sync primitives


Issue 6: Inconsistent Environment Variable Handling

Comparison:

ServicePatternExample
local-quote-serviceFlag + env overrideRPC_PROXY_URL overrides -rpc-proxy-url
external-quote-serviceFlag + env overrideHTTP_PORT overrides -http-port
pool-discovery-serviceFlag only-rpc-proxy flag, no env override

Recommendation: Create shared config loader in pkg/config/


Issue 7: Missing gRPC Health Check

Location: Both main.go files

// TODO: Register Health check service using common/grpc/health.go
// For now, health checks are optional and can be added later

Impact: Kubernetes/load balancers cannot verify gRPC health

Fix: Register health service from go/internal/common/grpc/health.go


Well-Implemented Components

1. Dual Cache Architecture (Local Service)

Location: go/internal/local-quote-service/cache/

  • Pool state cache with protocol-specific TTLs
  • Quote response cache with pool-aware invalidation
  • WebSocket integration for real-time updates
  • Metrics callbacks for monitoring

2. Quality Management System

Location: go/internal/local-quote-service/quality/

  • PoolQualityManager: Oracle-based validation
  • PoolSelectionEngine: Two-stage filtering
  • PriorityRefreshManager: Dynamic refresh scheduling
  • Rogue pool detection

3. Observability Integration

All services use unified pattern:

obsConfig := observability.Config{
    ServiceName:    "service-name",
    ServiceVersion: "1.0.0",
    Environment:    cfg.Environment,
    OTLPEndpoint:   cfg.OTLPEndpoint,
    LogLevel:       cfg.LogLevel,
}
observability.Init(obsConfig)

4. Pool Discovery NATS Integration

Location: go/internal/pool-discovery/events/events.go

Complete implementation:

  • JetStream publisher
  • Async publishing with error handling
  • Event types: pool.discovery.*, market.price.update

5. gRPC Server Configuration

Both services have proper keep-alive:

kaParams := keepalive.ServerParameters{
    Time:    30 * time.Second,
    Timeout: 10 * time.Second,
}
kaPolicy := keepalive.EnforcementPolicy{
    MinTime:             10 * time.Second,
    PermitWithoutStream: true,
}
grpc.MaxConcurrentStreams(1000)

Optimization Roadmap

Phase 1: NATS Integration (BLOCKING) ✅ COMPLETE

Priority: P0 Status: ✅ Completed January 18, 2026

TaskFileDescriptionStatus
1.1local-quote-service/grpc/server.goAdd NATS publisher to Server struct
1.2local-quote-service/grpc/server.goPublish after each safeSend()
1.3external-quote-service/grpc/server.goSame as 1.1-1.2
1.4local-quote-service/main.goInject EventPublisher into gRPC server
1.5external-quote-service/main.goSame as 1.4
1.6internal/common/events/quote_publisher.goShared QuoteEventPublisher

Phase 2: Cache Population (HIGH) ✅ COMPLETE

Priority: P1 Status: ✅ Completed January 18, 2026

TaskFileDescriptionStatus
2.1external-quote-service/grpc/server.goPopulate RouteCache after API fetch
2.2external-quote-service/grpc/server.goPopulate PriceCache after API fetch
2.3-Add cache metrics

Phase 3: Code Quality (MEDIUM) ✅ COMPLETE

Priority: P2 Status: ✅ Completed January 18, 2026

TaskFileDescriptionStatus
3.1Both gRPC serversRemove or use subscribers map
3.2Both gRPC serversAdd sync.WaitGroup for goroutines
3.3Both main.goRegister gRPC health check
3.4pkg/config/Create shared config loader

Phase 4: Resilience (LOW) - IN PROGRESS

Priority: P3 Status: 🔄 In Progress

TaskFileDescriptionStatus
4.1Both gRPC serversImprove safeSend() channel handling
4.2Both gRPC serversAdd buffer pressure metrics
4.3External serviceImplement circuit breakers

Phase 5: JetStream Subscription Fix ✅ COMPLETE

Priority: P0 (Critical for Dashboard) Status: ✅ Completed January 18, 2026

Problem: SwapRouteEvent published via JetStream was not received by event-logger-service which was using core NATS subscriptions.

TaskFileDescriptionStatus
5.1event-logger-service/jetstream.goCreate JetStream subscription helpers
5.2event-logger-service/main.goUse JetStream consumers instead of core NATS
5.3-Remove duplicate stream creation (use system-initializer)

Files Modified:

  • go/cmd/event-logger-service/jetstream.go (NEW) - JetStream consumer configurations
  • go/cmd/event-logger-service/main.go - Updated to use JetStream context

Startup Order:

  1. system-initializer (TypeScript) - Creates all 6 JetStream streams
  2. event-logger-service (Go) - Subscribes to existing streams

NATS Integration Specification

Subject Pattern

Per architecture document, quote services should publish to:

market.swap_route.<provider>

Examples:

  • market.swap_route.local - Local pool math quotes
  • market.swap_route.jupiter - Jupiter API quotes
  • market.swap_route.dflow - DFlow API quotes

Event Schema

// SwapRouteEvent represents a quote update for NATS publishing
type SwapRouteEvent struct {
    EventType    string    `json:"event_type"`     // "swap_route.updated"
    EventID      string    `json:"event_id"`       // UUID
    Timestamp    int64     `json:"timestamp"`      // Unix milliseconds
    Source       string    `json:"source"`         // Service name

    // Quote data
    Provider     string    `json:"provider"`       // "local", "jupiter", etc.
    InputMint    string    `json:"input_mint"`
    OutputMint   string    `json:"output_mint"`
    InputAmount  uint64    `json:"input_amount"`
    OutputAmount uint64    `json:"output_amount"`
    PriceImpact  float64   `json:"price_impact_bps"`

    // Routing
    Route        []RouteHop `json:"route"`

    // Metadata
    PairID       string    `json:"pair_id"`        // For shared memory lookup
    ContextSlot  uint64    `json:"context_slot"`   // Solana slot
}

type RouteHop struct {
    Protocol   string  `json:"protocol"`
    PoolID     string  `json:"pool_id"`
    InputMint  string  `json:"input_mint"`
    OutputMint string  `json:"output_mint"`
    FeeBps     float64 `json:"fee_bps"`
}

Implementation Pattern

Reference: go/internal/pool-discovery/events/events.go

// In gRPC server
type Server struct {
    // ... existing fields
    eventPublisher *events.EventPublisher  // Add this
}

// After safeSend in calculateAndStreamPairedQuotes:
func (s *Server) calculateAndStreamPairedQuotes(...) {
    // ... existing code ...

    safeSend(quoteChan, response, ctx)

    // NEW: Publish to NATS
    if s.eventPublisher != nil {
        event := &events.SwapRouteEvent{
            EventType:    "swap_route.updated",
            Provider:     "local",
            InputMint:    inputMint,
            OutputMint:   outputMint,
            InputAmount:  amount,
            OutputAmount: bestQuote.OutputAmount,
            PairID:       pairid.Generate(inputMint, outputMint, amount),
            Timestamp:    time.Now().UnixMilli(),
        }
        s.eventPublisher.PublishSwapRoute(ctx, event)
    }
}

Testing Checklist

Pre-Integration Testing

  • Verify local-quote-service starts without errors
  • Verify external-quote-service starts without errors
  • Verify pool-discovery-service populates Redis
  • Verify NATS connection established

gRPC Streaming Testing

  • Connect gRPC client to local-quote-service:50052
  • Connect gRPC client to external-quote-service:50053
  • Verify quotes stream every 10 seconds
  • Verify paired quotes (forward + reverse) arrive together
  • Test client disconnect handling

NATS Integration Testing (After Implementation)

  • Subscribe to market.swap_route.> wildcard
  • Verify events published when gRPC streams
  • Verify event schema matches specification
  • Verify Scanner receives events
  • Load test: 45 pairs × 40 amounts = 1,800 events/cycle

Performance Testing

  • Measure quote latency (target: <10ms local, <500ms external)
  • Measure memory usage per client connection
  • Verify goroutine count stabilizes
  • Test with multiple concurrent clients

Appendix: File References

Main Service Files

ServiceMainAdditional FilesNotes
local-quote-servicego/cmd/local-quote-service/main.gogo/internal/local-quote-service/grpc/server.gogRPC + NATS publishing
external-quote-servicego/cmd/external-quote-service/main.gogo/internal/external-quote-service/grpc/server.gogRPC + NATS publishing
pool-discovery-servicego/cmd/pool-discovery-service/main.gogo/internal/pool-discovery/events/NATS publishing
event-logger-servicego/cmd/event-logger-service/main.gojetstream.go, flatbuffers_handler.goJetStream subscriptions

Key Internal Packages

PackagePurpose
internal/local-quote-service/cache/Dual cache implementation
internal/local-quote-service/calculator/Protocol-specific quote math
internal/local-quote-service/quality/Pool quality management
internal/external-quote-service/cache/Split cache (route + price)
internal/pool-discovery/events/NATS event publishing (pool discovery)
internal/common/events/Shared QuoteEventPublisher for quote services
pkg/quoters/External API clients
pkg/subjects/Generated NATS subject constants

Proto Definition

Location: proto/quote.proto

Key messages:

  • QuoteStreamRequest: Client subscription request
  • QuoteStreamResponse: Quote update (streamed)
  • RoutePlan: Swap route hop details
  • OraclePrices: Oracle price data

Document History

VersionDateAuthorChanges
1.02026-01-17Solution ArchitectInitial review
1.12026-01-18Solution ArchitectPhase 1-4 complete, JetStream fix for event-logger

Next Steps

  1. Test End-to-End Pipeline:
    • Start system-initializer to create JetStream streams
    • Start event-logger-service to subscribe to streams
    • Start local-quote-service and external-quote-service
    • Verify SwapRouteEvent appears on Grafana dashboard
  2. Remaining Work:
    • Full integration testing with scanner-service
    • Load testing with production traffic patterns
    • Performance benchmarking (target: <10ms local, <500ms external)
  3. Documentation:
    • Update go/cmd/event-logger-service/README.md with JetStream details
    • Add startup order to deployment documentation