Quote Aggregator Service Implementation
Quote Aggregator Service Implementation
Date: January 18, 2026 Status: ✅ Core Implementation Complete Base Architecture: 30-QUOTE-SERVICE-ARCHITECTURE.md v3.0 + 30.1-QUOTE-SERVICE-ARCHITECTURE-REVIEW.md v3.1
Implementation Summary
The quote-aggregator-service has been implemented following the v3.1 architecture enhancements. This service is the client-facing API that combines quotes from Local Quote Service and External Quote Service with confidence scoring.
Completed Components
1. Upstream gRPC Client (internal/quote-aggregator-service/client/)
File: upstream_client.go
- Connects to both local (50052) and external (50053) quote services
- Uses
StreamBatchQuotesfor persistent streaming connections - Implements health tracking and statistics collection
- gRPC keepalive configuration for connection stability
- Thread-safe statistics with atomic operations
Key Features:
- Parallel streaming from both upstream services
- Automatic reconnection with exponential backoff
- Connection health monitoring
- Latency and error tracking
2. Quote Aggregator (internal/quote-aggregator-service/aggregator/)
File: aggregator.go
- Real-time merging of local and external quote streams
- In-memory quote tables with pairID-based deduplication
- Route hash calculation for duplicate detection
- Best quote selection with confidence weighting
Key Features:
- Confidence-weighted quote comparison
- Price difference tracking and logging
- Statistics for local-only, external-only, and both-sources quotes
- Decision recommendation: Execute, Verify, Cautious, Skip
3. gRPC Server (internal/quote-aggregator-service/server/)
File: server.go
Implements QuoteAggregatorServiceServer interface:
StreamAggregatedQuotes- Server-streaming RPC for continuous updatesGetAggregatedQuote- Unary RPC for single quote with 150ms timeoutGetConfidenceBreakdown- Detailed confidence factor analysisHealth- Comprehensive health check with upstream service status
Key Features:
- Active stream counting
- Request statistics (total, local-only, full)
- Average confidence tracking
- Upstream service health reporting
4. NATS Event Publisher (internal/quote-aggregator-service/publisher/)
File: publisher.go
- Publishes
SwapRouteEventtomarket.swap_route.optimal - FlatBuffers serialization for efficient event encoding
- JetStream async publishing for non-blocking operation
- Statistics tracking (published, errors)
Key Features:
- SwapRouteEvent generation from aggregated quotes
- Route hop conversion to FlatBuffers format
- Estimated profit calculation (local vs external)
- Flush support for graceful shutdown
5. Confidence Scoring Integration
Integrates with existing pkg/confidence package:
- 5-factor algorithm: Pool Age (30%), Route (20%), Oracle (30%), Provider (10%), Slippage (10%)
- Separate confidence calculation for local and external quotes
- Configurable default provider uptime
- Decision threshold mapping to recommendations
6. Main Application (cmd/quote-aggregator-service/main.go)
Complete wiring of all components:
- Command-line flags and environment variable configuration
- Dependency injection pattern
- HTTP server with Gin framework (health, stats, metrics)
- gRPC server with keepalive
- Graceful shutdown with timeout
- Observability integration (OpenTelemetry)
API Endpoints
gRPC (Port 50051)
service QuoteAggregatorService {
// Stream aggregated quotes with confidence scores
rpc StreamAggregatedQuotes(BatchQuoteRequest) returns (stream AggregatedQuote);
// Get single aggregated quote
rpc GetAggregatedQuote(QuoteRequest) returns (AggregatedQuote);
// Get detailed confidence breakdown
rpc GetConfidenceBreakdown(ConfidenceRequest) returns (ConfidenceResponse);
// Health check
rpc Health(HealthRequest) returns (AggregatorHealthResponse);
}
HTTP (Port 8080)
| Endpoint | Description |
|---|---|
GET /health | Service health with upstream status |
GET /stats | Aggregation statistics |
GET /metrics | Prometheus metrics |
Configuration
Command-Line Flags
quote-aggregator-service \
-http-port 8080 \
-grpc-port 50051 \
-local-quote-addr localhost:50052 \
-external-quote-addr localhost:50053 \
-nats nats://localhost:4222 \
-local-timeout 10ms \
-external-timeout 100ms \
-enable-confidence-scoring true
Environment Variables
| Variable | Description |
|---|---|
NATS_URL | NATS server URL |
LOCAL_QUOTE_SERVICE_ADDR | Local quote service address |
EXTERNAL_QUOTE_SERVICE_ADDR | External quote service address |
OTEL_EXPORTER_OTLP_ENDPOINT | OpenTelemetry endpoint |
Architecture Flow
┌─────────────────────────────────────────────────────────────────────┐
│ Quote Aggregator Service │
│ (Port 50051) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ UpstreamClient │ │ Aggregator │ │
│ │ │ │ │ │
│ │ StreamLocal()───┼────────►│ localQuotes │ │
│ │ StreamExternal()┼────────►│ externalQuotes │ │
│ │ │ │ │ │
│ │ Health tracking │ │ buildAggregated │ │
│ │ Statistics │ │ selectBestQuote │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────────────┐ │
│ │ │ Confidence Calc │ │
│ │ │ │ │
│ │ │ 5-factor score │ │
│ │ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Local Quote Svc │ │ gRPC Server │ │
│ │ (50052) │ │ │ │
│ ├──────────────────┤ │ Stream/Unary │ │
│ │ External Quote │ │ Health/Stats │ │
│ │ (50053) │ └────────┬─────────┘ │
│ └──────────────────┘ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ EventPublisher │ │
│ │ │ │
│ │ SwapRouteEvent │───► NATS │
│ │ (FlatBuffers) │ JetStream │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Next Steps
Phase 1: Integration Testing
- Test with running local-quote-service
- Test with running external-quote-service
- Verify NATS event publishing
- Test gRPC streaming with ts-scanner-service
Phase 2: Shared Memory Writer
- Implement dual shared memory writer (
quotes-local.mmap,quotes-external.mmap) - Ring buffer with hybrid change detection
- Rust scanner integration
Phase 3: Production Hardening
- Add circuit breaker for upstream services
- Implement retry with exponential backoff
- Add rate limiting for downstream clients
- Comprehensive metrics and alerting
Files Created/Modified
| File | Status |
|---|---|
go/internal/quote-aggregator-service/client/upstream_client.go | ✅ Created |
go/internal/quote-aggregator-service/aggregator/aggregator.go | ✅ Created |
go/internal/quote-aggregator-service/server/server.go | ✅ Created |
go/internal/quote-aggregator-service/publisher/publisher.go | ✅ Created |
go/cmd/quote-aggregator-service/main.go | ✅ Updated |
Compilation Status
$ go build ./cmd/quote-aggregator-service/...
# Success - no errors
$ go build ./internal/quote-aggregator-service/...
# Success - no errors
Related Documents
30-QUOTE-SERVICE-ARCHITECTURE.md- Base architecture (v3.0)30.1-QUOTE-SERVICE-ARCHITECTURE-REVIEW.md- v3.1 enhancements30.2-SHARED-MEMORY-HYBRID-CHANGE-DETECTION.md- Shared memory designproto/quote_aggregator.proto- Proto definitions
