Quote Service Architecture Optimization Review
Quote Service Architecture Optimization Review
Date: January 17-18, 2026 Status: Phase 1-3 Complete, Phase 4 In Progress Version: 1.1 Reviewer: Solution Architect + Senior Go Developer
Table of Contents
- Executive Summary
- Services Reviewed
- Critical Issues
- Medium Priority Issues
- Well-Implemented Components
- Optimization Roadmap
- NATS Integration Specification
- Testing Checklist
Executive Summary
Review Scope
Comprehensive architecture review of Go quote services against the canonical architecture document (docs/30-QUOTE-SERVICE-ARCHITECTURE.md). Focus areas:
- Service implementation completeness
- gRPC streaming functionality
- NATS integration status
- Code quality and optimization opportunities
Key Finding
The gRPC quote streaming is implemented but NOT connected to NATS messaging.
This is the primary blocker for HFT pipeline integration. Scanners expect quote updates via NATS (market.swap_route.*) but gRPC servers only stream to direct gRPC clients.
Completion Status
| Service | Implementation | NATS Integration | Test Ready |
|---|---|---|---|
| local-quote-service | 95% | ✅ Complete | Yes |
| external-quote-service | 90% | ✅ Complete | Yes |
| pool-discovery-service | 100% | ✅ Complete | Yes |
| event-logger-service | 100% | ✅ Complete (JetStream) | Yes |
Progress Update (January 18, 2026)
Completed Today:
- ✅ Phase 1: NATS Integration - Quote publishers now emit SwapRouteEvent
- ✅ Phase 2: Cache population implemented
- ✅ Phase 3.4: Shared config loader created
- ✅ Phase 4.1-4.3: Resilience improvements
- ✅ Critical Fix: event-logger-service now uses JetStream subscriptions
Key Issue Resolved: SwapRouteEvent was not appearing on Grafana dashboard because:
- Quote services publish via JetStream (
js.PublishAsync()) - event-logger-service was subscribing via core NATS (
nc.Subscribe()) - JetStream messages require proper stream binding to be received
Fix Applied: Updated event-logger-service to use JetStream consumers with proper stream binding.
Services Reviewed
1. Local Quote Service
Location: go/cmd/local-quote-service/main.go (1088 lines)
Architecture:
- Dual cache: Pool state (Layer 1) + Quote response (Layer 2)
- Background refresh: AMM 1s, CLMM 30s
- WebSocket integration for real-time pool updates
- Quality management with oracle validation
gRPC Server: go/internal/local-quote-service/grpc/server.go (418 lines)
Key Features Implemented:
- ✅ Parallel paired quote calculation (forward + reverse)
- ✅ Cache lookup with fresh calculation fallback
- ✅ Keep-alive configuration for streaming
- ✅ Context cancellation handling
- ❌ NATS event publishing (NOT IMPLEMENTED)
2. External Quote Service
Location: go/cmd/external-quote-service/main.go (509 lines)
Architecture:
- Split cache: Route topology + Price data
- Multi-provider: Jupiter, DFlow, OKX, dark pools
- Rate limiting per provider (Jupiter: 1 RPS shared)
- Circuit breakers (planned)
gRPC Server: go/internal/external-quote-service/grpc/server.go (527 lines)
Key Features Implemented:
- ✅ QuoterManager integration
- ✅ Parallel paired quote calculation
- ✅ Provider rotation and best quote selection
- ⚠️ Cache population NOT implemented (fetches fresh every time)
- ❌ NATS event publishing (NOT IMPLEMENTED)
3. Pool Discovery Service
Location: go/cmd/pool-discovery-service/main.go (514 lines)
Architecture:
- Multi-protocol scanner (Raydium, Meteora, PumpSwap, etc.)
- Redis storage with TTL
- Solscan enrichment with rate limiting
- NATS event publishing (working)
Key Features Implemented:
- ✅ Full NATS integration via
events.NewEventPublisher - ✅ Oracle manager (Pyth + Jupiter)
- ✅ Crash recovery from Redis cache
- ✅ Prometheus metrics
4. Event Logger Service
Location: go/cmd/event-logger-service/main.go + jetstream.go
Architecture:
- 6-stream JetStream subscription (MARKET_DATA, OPPORTUNITIES, PLANNED, EXECUTED, METRICS, SYSTEM)
- FlatBuffers event handling
- Prometheus metrics
- Durable consumers with explicit acknowledgment
Key Files:
main.go- Service entry point, JetStream context creationjetstream.go- JetStream subscription helpers, consumer configurationsflatbuffers_handler.go- FlatBuffers event deserialization
Consumer Configuration (9 durable consumers): | Stream | Consumer | Filter Subject | |——–|———-|—————-| | MARKET_DATA | event-logger-market-data | market.> | | OPPORTUNITIES | event-logger-opportunities | opportunity.> | | PLANNED | event-logger-planned | execution.plan.> | | PLANNED | event-logger-rejected | execution.rejected.> | | EXECUTED | event-logger-started | execution.started.> | | EXECUTED | event-logger-results | execution.result.> | | EXECUTED | event-logger-failed | execution.failed.> | | METRICS | event-logger-metrics | metrics.> | | SYSTEM | event-logger-system | system.> |
Important: Streams are created by system-initializer service (TypeScript). Event-logger-service only subscribes to existing streams.
Status: ✅ Complete and working with JetStream
Critical Issues
Issue 1: NATS Not Connected to gRPC Streaming
Severity: CRITICAL (Blocks HFT pipeline)
Architecture Requirement (from docs/30-QUOTE-SERVICE-ARCHITECTURE.md:316-319):
**NATS Messaging** (Required):
- **Quote-Service** publishes to `market.swap_route.*` for HFT pipeline integration
- **Scanner** consumes from NATS for real-time opportunity detection
- **Critical Path**: Required for Scanner → Planner → Executor pipeline
Current State:
gRPC Server NATS
│ │
├─► Stream to gRPC Client │ (working)
│ │
╳─► Publish to NATS │ (NOT IMPLEMENTED)
Affected Files:
go/internal/local-quote-service/grpc/server.go:249- AftersafeSend()go/internal/external-quote-service/grpc/server.go:267- AftersafeSend()
Impact:
- Scanner cannot receive quote updates
- HFT pipeline broken at first step
- Only direct gRPC clients receive quotes
Fix Location: See NATS Integration Specification
Issue 2: Unused Subscribers Map (Memory Leak)
Severity: MEDIUM
Location:
go/internal/local-quote-service/grpc/server.go:33,48,88-97go/internal/external-quote-service/grpc/server.go:35,48,88-97
Code Pattern:
type Server struct {
subscribers map[string]chan *proto.QuoteStreamResponse // Line 33
}
// In StreamQuotes():
s.subscribers[subscriberID] = quoteChan // Line 89 - Added
// ...
delete(s.subscribers, subscriberID) // Line 95 - Deleted on disconnect
Problem:
- Map created but never used for actual functionality
- Only purpose is tracking for cleanup (already handled by defer)
- Memory leak if goroutines hang before reaching delete
Recommendation:
- Remove if not needed for broadcast
- Use for subscriber management/metrics if kept
Issue 3: Goroutine Lifecycle Not Tracked
Severity: MEDIUM
Location: Both gRPC servers at startContinuousStreaming()
- Local:
server.go:133-138 - External:
server.go:133-138
Code Pattern:
for _, pair := range pairs {
for _, amount := range amounts {
go s.streamQuotePairContinuously(ctx, pair, amount, slippageBps, quoteChan)
}
}
Problem:
- 45 pairs × 40 amounts = 1,800 goroutines per client
- Each spawns 2 sub-goroutines = 5,400 total untracked
- No
sync.WaitGroupfor graceful shutdown - No way to verify all goroutines terminated
Fix:
var wg sync.WaitGroup
for _, pair := range pairs {
for _, amount := range amounts {
wg.Add(1)
go func(p *proto.TokenPair, amt uint64) {
defer wg.Done()
s.streamQuotePairContinuously(ctx, p, amt, slippageBps, quoteChan)
}(pair, amount)
}
}
// Store wg for graceful shutdown
Issue 4: External Quote Cache Not Populated
Severity: MEDIUM
Location: go/internal/external-quote-service/grpc/server.go:261-263
Code Comment:
// Note: Split cache storage (route + price) will be implemented in Phase 3
// For now, quotes are fetched fresh from external APIs each time
// This ensures freshest data but uses more API calls
Problem:
- Route cache (
RouteCache) exists but unused - Price cache (
PriceCache) exists but unused - Every request hits external API (wastes rate limit)
- Jupiter: 1 RPS limit exhausted quickly
Impact: Under load, rate limit exhaustion causes quote failures
Fix Required:
// After getting bestQuote, store in caches:
s.routeCache.Set(routeKey, &cache.Route{...})
s.priceCache.Set(priceKey, &cache.PriceData{...})
Medium Priority Issues
Issue 5: safeSend Race Condition
Location: Both servers at safeSend() function
Pattern:
select {
case ch <- msg:
return true
default:
select {
case ch <- msg: // Race: channel could close between checks
return true
case <-time.After(100 * time.Millisecond):
return false
}
}
Problems:
- Channel can close between
defaultand nestedselect - Panic recovery catches crash but loses data silently
- No metrics for buffer pressure
Recommendation: Add channel state check or use sync primitives
Issue 6: Inconsistent Environment Variable Handling
Comparison:
| Service | Pattern | Example |
|---|---|---|
| local-quote-service | Flag + env override | RPC_PROXY_URL overrides -rpc-proxy-url |
| external-quote-service | Flag + env override | HTTP_PORT overrides -http-port |
| pool-discovery-service | Flag only | -rpc-proxy flag, no env override |
Recommendation: Create shared config loader in pkg/config/
Issue 7: Missing gRPC Health Check
Location: Both main.go files
// TODO: Register Health check service using common/grpc/health.go
// For now, health checks are optional and can be added later
Impact: Kubernetes/load balancers cannot verify gRPC health
Fix: Register health service from go/internal/common/grpc/health.go
Well-Implemented Components
1. Dual Cache Architecture (Local Service)
Location: go/internal/local-quote-service/cache/
- Pool state cache with protocol-specific TTLs
- Quote response cache with pool-aware invalidation
- WebSocket integration for real-time updates
- Metrics callbacks for monitoring
2. Quality Management System
Location: go/internal/local-quote-service/quality/
PoolQualityManager: Oracle-based validationPoolSelectionEngine: Two-stage filteringPriorityRefreshManager: Dynamic refresh scheduling- Rogue pool detection
3. Observability Integration
All services use unified pattern:
obsConfig := observability.Config{
ServiceName: "service-name",
ServiceVersion: "1.0.0",
Environment: cfg.Environment,
OTLPEndpoint: cfg.OTLPEndpoint,
LogLevel: cfg.LogLevel,
}
observability.Init(obsConfig)
4. Pool Discovery NATS Integration
Location: go/internal/pool-discovery/events/events.go
Complete implementation:
- JetStream publisher
- Async publishing with error handling
- Event types: pool.discovery.*, market.price.update
5. gRPC Server Configuration
Both services have proper keep-alive:
kaParams := keepalive.ServerParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
}
kaPolicy := keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}
grpc.MaxConcurrentStreams(1000)
Optimization Roadmap
Phase 1: NATS Integration (BLOCKING) ✅ COMPLETE
Priority: P0 Status: ✅ Completed January 18, 2026
| Task | File | Description | Status |
|---|---|---|---|
| 1.1 | local-quote-service/grpc/server.go | Add NATS publisher to Server struct | ✅ |
| 1.2 | local-quote-service/grpc/server.go | Publish after each safeSend() | ✅ |
| 1.3 | external-quote-service/grpc/server.go | Same as 1.1-1.2 | ✅ |
| 1.4 | local-quote-service/main.go | Inject EventPublisher into gRPC server | ✅ |
| 1.5 | external-quote-service/main.go | Same as 1.4 | ✅ |
| 1.6 | internal/common/events/quote_publisher.go | Shared QuoteEventPublisher | ✅ |
Phase 2: Cache Population (HIGH) ✅ COMPLETE
Priority: P1 Status: ✅ Completed January 18, 2026
| Task | File | Description | Status |
|---|---|---|---|
| 2.1 | external-quote-service/grpc/server.go | Populate RouteCache after API fetch | ✅ |
| 2.2 | external-quote-service/grpc/server.go | Populate PriceCache after API fetch | ✅ |
| 2.3 | - | Add cache metrics | ✅ |
Phase 3: Code Quality (MEDIUM) ✅ COMPLETE
Priority: P2 Status: ✅ Completed January 18, 2026
| Task | File | Description | Status |
|---|---|---|---|
| 3.1 | Both gRPC servers | Remove or use subscribers map | ✅ |
| 3.2 | Both gRPC servers | Add sync.WaitGroup for goroutines | ✅ |
| 3.3 | Both main.go | Register gRPC health check | ✅ |
| 3.4 | pkg/config/ | Create shared config loader | ✅ |
Phase 4: Resilience (LOW) - IN PROGRESS
Priority: P3 Status: 🔄 In Progress
| Task | File | Description | Status |
|---|---|---|---|
| 4.1 | Both gRPC servers | Improve safeSend() channel handling | ✅ |
| 4.2 | Both gRPC servers | Add buffer pressure metrics | ✅ |
| 4.3 | External service | Implement circuit breakers | ✅ |
Phase 5: JetStream Subscription Fix ✅ COMPLETE
Priority: P0 (Critical for Dashboard) Status: ✅ Completed January 18, 2026
Problem: SwapRouteEvent published via JetStream was not received by event-logger-service which was using core NATS subscriptions.
| Task | File | Description | Status |
|---|---|---|---|
| 5.1 | event-logger-service/jetstream.go | Create JetStream subscription helpers | ✅ |
| 5.2 | event-logger-service/main.go | Use JetStream consumers instead of core NATS | ✅ |
| 5.3 | - | Remove duplicate stream creation (use system-initializer) | ✅ |
Files Modified:
go/cmd/event-logger-service/jetstream.go(NEW) - JetStream consumer configurationsgo/cmd/event-logger-service/main.go- Updated to use JetStream context
Startup Order:
system-initializer(TypeScript) - Creates all 6 JetStream streamsevent-logger-service(Go) - Subscribes to existing streams
NATS Integration Specification
Subject Pattern
Per architecture document, quote services should publish to:
market.swap_route.<provider>
Examples:
market.swap_route.local- Local pool math quotesmarket.swap_route.jupiter- Jupiter API quotesmarket.swap_route.dflow- DFlow API quotes
Event Schema
// SwapRouteEvent represents a quote update for NATS publishing
type SwapRouteEvent struct {
EventType string `json:"event_type"` // "swap_route.updated"
EventID string `json:"event_id"` // UUID
Timestamp int64 `json:"timestamp"` // Unix milliseconds
Source string `json:"source"` // Service name
// Quote data
Provider string `json:"provider"` // "local", "jupiter", etc.
InputMint string `json:"input_mint"`
OutputMint string `json:"output_mint"`
InputAmount uint64 `json:"input_amount"`
OutputAmount uint64 `json:"output_amount"`
PriceImpact float64 `json:"price_impact_bps"`
// Routing
Route []RouteHop `json:"route"`
// Metadata
PairID string `json:"pair_id"` // For shared memory lookup
ContextSlot uint64 `json:"context_slot"` // Solana slot
}
type RouteHop struct {
Protocol string `json:"protocol"`
PoolID string `json:"pool_id"`
InputMint string `json:"input_mint"`
OutputMint string `json:"output_mint"`
FeeBps float64 `json:"fee_bps"`
}
Implementation Pattern
Reference: go/internal/pool-discovery/events/events.go
// In gRPC server
type Server struct {
// ... existing fields
eventPublisher *events.EventPublisher // Add this
}
// After safeSend in calculateAndStreamPairedQuotes:
func (s *Server) calculateAndStreamPairedQuotes(...) {
// ... existing code ...
safeSend(quoteChan, response, ctx)
// NEW: Publish to NATS
if s.eventPublisher != nil {
event := &events.SwapRouteEvent{
EventType: "swap_route.updated",
Provider: "local",
InputMint: inputMint,
OutputMint: outputMint,
InputAmount: amount,
OutputAmount: bestQuote.OutputAmount,
PairID: pairid.Generate(inputMint, outputMint, amount),
Timestamp: time.Now().UnixMilli(),
}
s.eventPublisher.PublishSwapRoute(ctx, event)
}
}
Testing Checklist
Pre-Integration Testing
- Verify local-quote-service starts without errors
- Verify external-quote-service starts without errors
- Verify pool-discovery-service populates Redis
- Verify NATS connection established
gRPC Streaming Testing
- Connect gRPC client to local-quote-service:50052
- Connect gRPC client to external-quote-service:50053
- Verify quotes stream every 10 seconds
- Verify paired quotes (forward + reverse) arrive together
- Test client disconnect handling
NATS Integration Testing (After Implementation)
- Subscribe to
market.swap_route.>wildcard - Verify events published when gRPC streams
- Verify event schema matches specification
- Verify Scanner receives events
- Load test: 45 pairs × 40 amounts = 1,800 events/cycle
Performance Testing
- Measure quote latency (target: <10ms local, <500ms external)
- Measure memory usage per client connection
- Verify goroutine count stabilizes
- Test with multiple concurrent clients
Appendix: File References
Main Service Files
| Service | Main | Additional Files | Notes |
|---|---|---|---|
| local-quote-service | go/cmd/local-quote-service/main.go | go/internal/local-quote-service/grpc/server.go | gRPC + NATS publishing |
| external-quote-service | go/cmd/external-quote-service/main.go | go/internal/external-quote-service/grpc/server.go | gRPC + NATS publishing |
| pool-discovery-service | go/cmd/pool-discovery-service/main.go | go/internal/pool-discovery/events/ | NATS publishing |
| event-logger-service | go/cmd/event-logger-service/main.go | jetstream.go, flatbuffers_handler.go | JetStream subscriptions |
Key Internal Packages
| Package | Purpose |
|---|---|
internal/local-quote-service/cache/ | Dual cache implementation |
internal/local-quote-service/calculator/ | Protocol-specific quote math |
internal/local-quote-service/quality/ | Pool quality management |
internal/external-quote-service/cache/ | Split cache (route + price) |
internal/pool-discovery/events/ | NATS event publishing (pool discovery) |
internal/common/events/ | Shared QuoteEventPublisher for quote services |
pkg/quoters/ | External API clients |
pkg/subjects/ | Generated NATS subject constants |
Proto Definition
Location: proto/quote.proto
Key messages:
QuoteStreamRequest: Client subscription requestQuoteStreamResponse: Quote update (streamed)RoutePlan: Swap route hop detailsOraclePrices: Oracle price data
Document History
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-01-17 | Solution Architect | Initial review |
| 1.1 | 2026-01-18 | Solution Architect | Phase 1-4 complete, JetStream fix for event-logger |
Next Steps
- Test End-to-End Pipeline:
- Start
system-initializerto create JetStream streams - Start
event-logger-serviceto subscribe to streams - Start
local-quote-serviceandexternal-quote-service - Verify SwapRouteEvent appears on Grafana dashboard
- Start
- Remaining Work:
- Full integration testing with scanner-service
- Load testing with production traffic patterns
- Performance benchmarking (target: <10ms local, <500ms external)
- Documentation:
- Update
go/cmd/event-logger-service/README.mdwith JetStream details - Add startup order to deployment documentation
- Update
