Quote Aggregator Service Implementation

Quote Aggregator Service Implementation

Date: January 18, 2026 Status: ✅ Core Implementation Complete Base Architecture: 30-QUOTE-SERVICE-ARCHITECTURE.md v3.0 + 30.1-QUOTE-SERVICE-ARCHITECTURE-REVIEW.md v3.1


Implementation Summary

The quote-aggregator-service has been implemented following the v3.1 architecture enhancements. This service is the client-facing API that combines quotes from Local Quote Service and External Quote Service with confidence scoring.


Completed Components

1. Upstream gRPC Client (internal/quote-aggregator-service/client/)

File: upstream_client.go

  • Connects to both local (50052) and external (50053) quote services
  • Uses StreamBatchQuotes for persistent streaming connections
  • Implements health tracking and statistics collection
  • gRPC keepalive configuration for connection stability
  • Thread-safe statistics with atomic operations

Key Features:

  • Parallel streaming from both upstream services
  • Automatic reconnection with exponential backoff
  • Connection health monitoring
  • Latency and error tracking

2. Quote Aggregator (internal/quote-aggregator-service/aggregator/)

File: aggregator.go

  • Real-time merging of local and external quote streams
  • In-memory quote tables with pairID-based deduplication
  • Route hash calculation for duplicate detection
  • Best quote selection with confidence weighting

Key Features:

  • Confidence-weighted quote comparison
  • Price difference tracking and logging
  • Statistics for local-only, external-only, and both-sources quotes
  • Decision recommendation: Execute, Verify, Cautious, Skip

3. gRPC Server (internal/quote-aggregator-service/server/)

File: server.go

Implements QuoteAggregatorServiceServer interface:

  • StreamAggregatedQuotes - Server-streaming RPC for continuous updates
  • GetAggregatedQuote - Unary RPC for single quote with 150ms timeout
  • GetConfidenceBreakdown - Detailed confidence factor analysis
  • Health - Comprehensive health check with upstream service status

Key Features:

  • Active stream counting
  • Request statistics (total, local-only, full)
  • Average confidence tracking
  • Upstream service health reporting

4. NATS Event Publisher (internal/quote-aggregator-service/publisher/)

File: publisher.go

  • Publishes SwapRouteEvent to market.swap_route.optimal
  • FlatBuffers serialization for efficient event encoding
  • JetStream async publishing for non-blocking operation
  • Statistics tracking (published, errors)

Key Features:

  • SwapRouteEvent generation from aggregated quotes
  • Route hop conversion to FlatBuffers format
  • Estimated profit calculation (local vs external)
  • Flush support for graceful shutdown

5. Confidence Scoring Integration

Integrates with existing pkg/confidence package:

  • 5-factor algorithm: Pool Age (30%), Route (20%), Oracle (30%), Provider (10%), Slippage (10%)
  • Separate confidence calculation for local and external quotes
  • Configurable default provider uptime
  • Decision threshold mapping to recommendations

6. Main Application (cmd/quote-aggregator-service/main.go)

Complete wiring of all components:

  • Command-line flags and environment variable configuration
  • Dependency injection pattern
  • HTTP server with Gin framework (health, stats, metrics)
  • gRPC server with keepalive
  • Graceful shutdown with timeout
  • Observability integration (OpenTelemetry)

API Endpoints

gRPC (Port 50051)

service QuoteAggregatorService {
  // Stream aggregated quotes with confidence scores
  rpc StreamAggregatedQuotes(BatchQuoteRequest) returns (stream AggregatedQuote);

  // Get single aggregated quote
  rpc GetAggregatedQuote(QuoteRequest) returns (AggregatedQuote);

  // Get detailed confidence breakdown
  rpc GetConfidenceBreakdown(ConfidenceRequest) returns (ConfidenceResponse);

  // Health check
  rpc Health(HealthRequest) returns (AggregatorHealthResponse);
}

HTTP (Port 8080)

EndpointDescription
GET /healthService health with upstream status
GET /statsAggregation statistics
GET /metricsPrometheus metrics

Configuration

Command-Line Flags

quote-aggregator-service \
  -http-port 8080 \
  -grpc-port 50051 \
  -local-quote-addr localhost:50052 \
  -external-quote-addr localhost:50053 \
  -nats nats://localhost:4222 \
  -local-timeout 10ms \
  -external-timeout 100ms \
  -enable-confidence-scoring true

Environment Variables

VariableDescription
NATS_URLNATS server URL
LOCAL_QUOTE_SERVICE_ADDRLocal quote service address
EXTERNAL_QUOTE_SERVICE_ADDRExternal quote service address
OTEL_EXPORTER_OTLP_ENDPOINTOpenTelemetry endpoint

Architecture Flow

┌─────────────────────────────────────────────────────────────────────┐
│                     Quote Aggregator Service                        │
│                          (Port 50051)                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────────┐         ┌──────────────────┐                 │
│  │  UpstreamClient  │         │    Aggregator    │                 │
│  │                  │         │                  │                 │
│  │  StreamLocal()───┼────────►│  localQuotes     │                 │
│  │  StreamExternal()┼────────►│  externalQuotes  │                 │
│  │                  │         │                  │                 │
│  │  Health tracking │         │  buildAggregated │                 │
│  │  Statistics      │         │  selectBestQuote │                 │
│  └────────┬─────────┘         └────────┬─────────┘                 │
│           │                            │                            │
│           │                            ▼                            │
│           │                   ┌──────────────────┐                 │
│           │                   │  Confidence Calc │                 │
│           │                   │                  │                 │
│           │                   │  5-factor score  │                 │
│           │                   └────────┬─────────┘                 │
│           │                            │                            │
│           ▼                            ▼                            │
│  ┌──────────────────┐         ┌──────────────────┐                 │
│  │ Local Quote Svc  │         │   gRPC Server    │                 │
│  │   (50052)        │         │                  │                 │
│  ├──────────────────┤         │  Stream/Unary    │                 │
│  │ External Quote   │         │  Health/Stats    │                 │
│  │   (50053)        │         └────────┬─────────┘                 │
│  └──────────────────┘                  │                            │
│                                        ▼                            │
│                               ┌──────────────────┐                 │
│                               │  EventPublisher  │                 │
│                               │                  │                 │
│                               │  SwapRouteEvent  │───► NATS        │
│                               │  (FlatBuffers)   │    JetStream    │
│                               └──────────────────┘                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Next Steps

Phase 1: Integration Testing

  • Test with running local-quote-service
  • Test with running external-quote-service
  • Verify NATS event publishing
  • Test gRPC streaming with ts-scanner-service

Phase 2: Shared Memory Writer

  • Implement dual shared memory writer (quotes-local.mmap, quotes-external.mmap)
  • Ring buffer with hybrid change detection
  • Rust scanner integration

Phase 3: Production Hardening

  • Add circuit breaker for upstream services
  • Implement retry with exponential backoff
  • Add rate limiting for downstream clients
  • Comprehensive metrics and alerting

Files Created/Modified

FileStatus
go/internal/quote-aggregator-service/client/upstream_client.go✅ Created
go/internal/quote-aggregator-service/aggregator/aggregator.go✅ Created
go/internal/quote-aggregator-service/server/server.go✅ Created
go/internal/quote-aggregator-service/publisher/publisher.go✅ Created
go/cmd/quote-aggregator-service/main.go✅ Updated

Compilation Status

$ go build ./cmd/quote-aggregator-service/...
# Success - no errors

$ go build ./internal/quote-aggregator-service/...
# Success - no errors

  • 30-QUOTE-SERVICE-ARCHITECTURE.md - Base architecture (v3.0)
  • 30.1-QUOTE-SERVICE-ARCHITECTURE-REVIEW.md - v3.1 enhancements
  • 30.2-SHARED-MEMORY-HYBRID-CHANGE-DETECTION.md - Shared memory design
  • proto/quote_aggregator.proto - Proto definitions