Scanner → Planner → Executor Architecture
Scanner → Planner → Executor Architecture
Overview
This document provides detailed design patterns for the core Scanner → Planner → Executor architecture, with concrete examples and event schemas.
Architecture Principles
1. Separation of Concerns
- Scanners: Observe and collect data (no decision-making)
- Planners: Analyze data and make decisions (no execution)
- Executors: Execute decisions (no strategy logic)
2. Event-Driven Communication
- All components communicate via NATS events
- Loose coupling enables independent scaling
- Event replay for debugging and analysis
3. Idempotency
- All operations designed to be safely retried
- Event deduplication where needed
- Transaction IDs for tracking
4. Observable
- All events logged and traced
- Metrics at every boundary
- Clear error propagation
SCANNERS Layer
Purpose
Continuously monitor blockchain state and external data sources, emitting normalized events when relevant changes occur.
Design Patterns
Pattern 1: Polling Scanner
abstract class PollingScanner {
abstract pollInterval: number;
abstract natsSubject: string;
async start() {
while (true) {
try {
const data = await this.fetch();
const events = await this.process(data);
for (const event of events) {
await this.publish(event);
}
} catch (error) {
this.logger.error({ error }, "Polling failed");
}
await sleep(this.pollInterval);
}
}
abstract fetch(): Promise<any>;
abstract process(data: any): Promise<Event[]>;
async publish(event: Event) {
await this.nats.publish(this.natsSubject, JSON.stringify(event));
this.metrics.eventsPublished.inc();
}
}
Example: Price Feed Scanner
class PriceFeedScanner extends PollingScanner {
pollInterval = 5000; // 5 seconds
natsSubject = "market.prices.update";
async fetch() {
// Fetch prices from multiple DEXes concurrently
const [raydiumPrices, meteoraPrices, jupiterPrices] = await Promise.all([
this.fetchRaydiumPrices(),
this.fetchMeteoraPrices(),
this.fetchJupiterPrices(),
]);
return { raydium: raydiumPrices, meteora: meteoraPrices, jupiter: jupiterPrices };
}
async process(data: any): Promise<PriceUpdateEvent[]> {
const events: PriceUpdateEvent[] = [];
for (const token of this.watchedTokens) {
const prices = {
raydium: data.raydium[token] ?? null,
meteora: data.meteora[token] ?? null,
jupiter: data.jupiter[token] ?? null,
};
// Calculate best bid/ask
const validPrices = Object.values(prices).filter(p => p !== null);
if (validPrices.length === 0) continue;
const bestBid = Math.max(...validPrices);
const bestAsk = Math.min(...validPrices);
const spread = ((bestAsk - bestBid) / bestBid) * 100;
events.push({
type: "price_update",
token,
prices,
bestBid,
bestAsk,
spread,
timestamp: Date.now(),
});
}
return events;
}
}
Pattern 2: Subscription Scanner
abstract class SubscriptionScanner {
abstract natsSubject: string;
async start() {
const subscription = await this.subscribe();
for await (const update of subscription) {
try {
const events = await this.process(update);
for (const event of events) {
// Deduplicate
const isDuplicate = await this.checkDuplicate(event);
if (isDuplicate) continue;
await this.publish(event);
}
} catch (error) {
this.logger.error({ error, update }, "Processing failed");
}
}
}
abstract subscribe(): Promise<AsyncIterable<any>>;
abstract process(update: any): Promise<Event[]>;
async checkDuplicate(event: Event): Promise<boolean> {
const key = `dedup:${event.type}:${this.getEventId(event)}`;
const exists = await this.redis.set(key, "1", "EX", 5, "NX");
return exists === null; // null = key already existed
}
abstract getEventId(event: Event): string;
}
Example: Account Change Scanner
class AccountChangeScanner extends SubscriptionScanner {
natsSubject = "market.events.account_change";
async subscribe() {
// Subscribe to Shredstream or WebSocket
return this.solana.onAccountChange(
this.watchedAccounts,
{ commitment: "confirmed" }
);
}
async process(update: AccountUpdate): Promise<AccountChangeEvent[]> {
const account = update.accountId;
const data = update.accountInfo.data;
// Decode based on account type
if (this.isRaydiumPool(account)) {
const pool = decodeRaydiumPool(data);
return [{
type: "pool_update",
protocol: "raydium",
poolAddress: account,
tokenA: pool.baseMint,
tokenB: pool.quoteMint,
reserveA: pool.baseReserve,
reserveB: pool.quoteReserve,
timestamp: Date.now(),
slot: update.context.slot,
}];
}
// Other account types...
return [];
}
getEventId(event: AccountChangeEvent): string {
return `${event.poolAddress}:${event.slot}`;
}
}
Scanner Event Schemas
PriceUpdateEvent
interface PriceUpdateEvent {
type: "price_update";
token: Address;
prices: {
[dex: string]: number | null;
};
bestBid: number;
bestAsk: number;
spread: number; // percentage
volume24h?: number;
liquidity?: number;
timestamp: number;
}
// NATS subject: market.prices.update
AccountChangeEvent
interface AccountChangeEvent {
type: "pool_update" | "large_swap" | "liquidity_change";
protocol: string;
poolAddress: Address;
tokenA: Address;
tokenB: Address;
reserveA?: bigint;
reserveB?: bigint;
priceImpact?: number;
timestamp: number;
slot: number;
data: Record<string, any>;
}
// NATS subject: market.events.account_change
VolumeUpdateEvent
interface VolumeUpdateEvent {
type: "volume_update";
token: Address;
volume24h: number;
volumeChange: number; // percentage change from previous period
avgTradeSize: number;
tradeCount: number;
timestamp: number;
}
// NATS subject: market.volume.update
PLANNERS Layer
Purpose
Subscribe to scanner events, analyze data, identify trading opportunities, and emit execution plans.
Design Patterns
Pattern 1: Opportunity Detector
abstract class OpportunityDetector {
abstract strategy: string;
abstract subscribeToSubjects: string[];
abstract publishSubject: string;
async start() {
const nc = await this.nats.connect();
const js = nc.jetstream();
// Subscribe to multiple event types
for (const subject of this.subscribeToSubjects) {
const consumer = await js.consumers.get(subject, `${this.strategy}-planner`);
const messages = await consumer.consume();
this.processMessages(messages);
}
}
async processMessages(messages: AsyncIterable<Msg>) {
for await (const msg of messages) {
try {
const event = JSON.parse(msg.data.toString());
const opportunities = await this.analyze(event);
for (const opportunity of opportunities) {
await this.publish(opportunity);
}
msg.ack();
} catch (error) {
this.logger.error({ error }, "Analysis failed");
msg.nak();
}
}
}
abstract analyze(event: any): Promise<TradeOpportunity[]>;
async publish(opportunity: TradeOpportunity) {
await this.nats.publish(this.publishSubject, JSON.stringify(opportunity));
this.metrics.opportunitiesDetected.inc({ strategy: this.strategy });
}
}
Example: Arbitrage Planner
class ArbitragePlanner extends OpportunityDetector {
strategy = "arbitrage";
subscribeToSubjects = ["market.prices.update"];
publishSubject = "trade.opportunities.arbitrage";
async analyze(event: PriceUpdateEvent): Promise<TradeOpportunity[]> {
const opportunities: TradeOpportunity[] = [];
// Find all tradeable pairs with this token
const pairs = this.generateTradingPairs(event.token);
for (const pair of pairs) {
// Try different trade amounts
for (const amount of this.testAmounts) {
const opportunity = await this.checkArbitrage(
pair.tokenA,
pair.tokenB,
amount
);
if (opportunity && opportunity.expectedProfit > this.minProfit) {
opportunities.push(opportunity);
}
}
}
return opportunities;
}
async checkArbitrage(
tokenA: Address,
tokenB: Address,
amount: bigint
): Promise<TradeOpportunity | null> {
// Get quote: tokenA → tokenB
const quote1 = await this.quoteService.getQuote({
inputMint: tokenA,
outputMint: tokenB,
amount: amount.toString(),
});
if (!quote1) return null;
// Get quote: tokenB → tokenA
const quote2 = await this.quoteService.getQuote({
inputMint: tokenB,
outputMint: tokenA,
amount: quote1.outputAmount,
});
if (!quote2) return null;
// Calculate profit
const outAmount = BigInt(quote2.outputAmount);
const profit = outAmount - amount;
// Subtract fees
const flashLoanFee = amount * 5n / 10000n; // 0.05%
const jitoTip = 5000n + BigInt(Math.floor(Math.random() * 1000));
const netProfit = profit - flashLoanFee - jitoTip;
if (netProfit <= 0n) return null;
// Check if we've seen this route before (cache)
const routeHash = this.hashRoute([quote1.routePlan, quote2.routePlan]);
const cached = await this.redis.get(`route:${routeHash}`);
return {
id: uuid(),
strategy: "arbitrage",
tokenA,
tokenB,
amount: amount.toString(),
expectedProfit: netProfit.toString(),
priority: this.calculatePriority(netProfit, amount),
routes: [
{
inputMint: tokenA,
outputMint: tokenB,
routePlan: quote1.routePlan,
expectedOutput: quote1.outputAmount,
},
{
inputMint: tokenB,
outputMint: tokenA,
routePlan: quote2.routePlan,
expectedOutput: quote2.outputAmount,
},
],
useFlashLoan: true,
urgency: "high",
expiresAt: Date.now() + 5000, // 5 seconds
metadata: {
quote1Latency: quote1.latency,
quote2Latency: quote2.latency,
quote1Source: quote1.source,
quote2Source: quote2.source,
},
};
}
calculatePriority(profit: bigint, amount: bigint): number {
// Higher profit = higher priority
// Larger amounts = lower priority (more risk)
const profitPercent = Number(profit) / Number(amount);
return Math.min(100, Math.floor(profitPercent * 10000));
}
}
Example: Grid Trading Planner
class GridTradingPlanner extends OpportunityDetector {
strategy = "grid_trading";
subscribeToSubjects = ["market.prices.update"];
publishSubject = "trade.opportunities.grid";
private gridLevels: Map<Address, GridLevel[]> = new Map();
async analyze(event: PriceUpdateEvent): Promise<TradeOpportunity[]> {
const token = event.token;
const currentPrice = event.bestBid;
// Get or initialize grid for this token
let levels = this.gridLevels.get(token);
if (!levels) {
levels = this.initializeGrid(token, currentPrice);
this.gridLevels.set(token, levels);
}
const opportunities: TradeOpportunity[] = [];
for (const level of levels) {
// Check if price crossed buy level
if (currentPrice <= level.buyPrice && !level.buyFilled) {
opportunities.push({
id: uuid(),
strategy: "grid_trading",
tokenA: this.quoteCurrency, // USDC
tokenB: token,
amount: level.amount.toString(),
expectedProfit: "0", // Grid trading profits accumulate over time
priority: 50,
routes: [{
inputMint: this.quoteCurrency,
outputMint: token,
routePlan: [], // Will be filled by executor
expectedOutput: (BigInt(level.amount) * BigInt(Math.floor(currentPrice * 1e9)) / 1000000000n).toString(),
}],
useFlashLoan: false,
urgency: "low",
expiresAt: Date.now() + level.ttl,
metadata: {
gridLevel: level.index,
buyPrice: level.buyPrice,
sellPrice: level.sellPrice,
},
});
level.buyFilled = true;
level.sellFilled = false; // Reset sell for next cycle
}
// Check if price crossed sell level
if (currentPrice >= level.sellPrice && !level.sellFilled && level.buyFilled) {
opportunities.push({
id: uuid(),
strategy: "grid_trading",
tokenA: token,
tokenB: this.quoteCurrency,
amount: level.amount.toString(),
expectedProfit: ((level.sellPrice - level.buyPrice) * level.amount).toString(),
priority: 60,
routes: [{
inputMint: token,
outputMint: this.quoteCurrency,
routePlan: [],
expectedOutput: (BigInt(level.amount) * BigInt(Math.floor(currentPrice * 1e9)) / 1000000000n).toString(),
}],
useFlashLoan: false,
urgency: "low",
expiresAt: Date.now() + level.ttl,
metadata: {
gridLevel: level.index,
buyPrice: level.buyPrice,
sellPrice: level.sellPrice,
profit: (level.sellPrice - level.buyPrice) * level.amount,
},
});
level.sellFilled = true;
level.buyFilled = false; // Reset buy for next cycle
}
}
return opportunities;
}
initializeGrid(token: Address, basePrice: number): GridLevel[] {
const gridConfig = this.config.grids[token] || this.config.defaultGrid;
const levels: GridLevel[] = [];
for (let i = 0; i < gridConfig.levelCount; i++) {
const offset = (i - gridConfig.levelCount / 2) * gridConfig.spacing;
const buyPrice = basePrice * (1 + offset / 100);
const sellPrice = basePrice * (1 + (offset + gridConfig.spacing) / 100);
levels.push({
index: i,
buyPrice,
sellPrice,
amount: gridConfig.amountPerLevel,
buyFilled: false,
sellFilled: false,
ttl: gridConfig.orderTTL || 12 * 60 * 60 * 1000, // 12 hours
});
}
return levels;
}
}
Planner Event Schemas
TradeOpportunity
interface TradeOpportunity {
id: string; // UUID
strategy: "arbitrage" | "grid_trading" | "dca" | "ai_signal";
tokenA: Address;
tokenB: Address;
amount: string; // bigint as string
expectedProfit: string; // bigint as string
priority: number; // 0-100, higher = more urgent
routes: RouteStep[];
useFlashLoan: boolean;
urgency: "low" | "medium" | "high";
expiresAt: number; // timestamp
metadata: Record<string, any>;
}
interface RouteStep {
inputMint: Address;
outputMint: Address;
routePlan: any[]; // Jupiter route plan or empty
expectedOutput: string;
}
// NATS subject: trade.opportunities.{strategy}
EXECUTORS Layer
Purpose
Subscribe to trade opportunities, select appropriate execution method, build and submit transactions, monitor confirmations.
Design Patterns
Pattern 1: Transaction Coordinator
class TransactionCoordinator {
private semaphore: Semaphore;
private executors: Map<string, Executor>;
async start() {
this.semaphore = new Semaphore(this.config.maxConcurrentTrades);
const nc = await this.nats.connect();
const js = nc.jetstream();
// Subscribe to all opportunity types
const consumer = await js.consumers.get("trade.opportunities", "executor-coordinator");
const messages = await consumer.consume();
for await (const msg of messages) {
// Don't block on execution
this.execute(msg).catch(error => {
this.logger.error({ error }, "Execution failed");
});
}
}
async execute(msg: Msg) {
try {
const opportunity: TradeOpportunity = JSON.parse(msg.data.toString());
// Check if expired
if (Date.now() > opportunity.expiresAt) {
this.logger.warn({ opportunityId: opportunity.id }, "Opportunity expired");
msg.ack();
return;
}
// Acquire semaphore (limit concurrency)
await this.semaphore.acquire();
try {
// Select wallet
const wallet = await this.walletManager.selectWallet({
requiredAmount: BigInt(opportunity.amount),
requiredToken: opportunity.tokenA,
tier: "worker",
});
if (!wallet) {
this.logger.warn({ opportunity }, "No wallet available");
msg.nak();
return;
}
// Select executor
const executor = this.selectExecutor(opportunity);
// Execute
const result = await executor.execute({
opportunity,
wallet,
timeout: 30000,
});
// Publish result
await this.publishResult(result);
msg.ack();
} finally {
this.semaphore.release();
}
} catch (error) {
this.logger.error({ error }, "Execution error");
msg.nak();
}
}
selectExecutor(opportunity: TradeOpportunity): Executor {
// High profit = use Jito for MEV protection
if (Number(opportunity.expectedProfit) > 0.1 * LAMPORTS_PER_SOL) {
return this.executors.get("jito")!;
}
// Time-sensitive strategies = use Jito
if (opportunity.urgency === "high") {
return this.executors.get("jito")!;
}
// Low urgency = use TPU (save on tips)
return this.executors.get("tpu")!;
}
async publishResult(result: ExecutionResult) {
await this.nats.publish("execution.results", JSON.stringify(result));
this.metrics.tradesExecuted.inc({
strategy: result.strategy,
status: result.status,
});
}
}
Pattern 2: Jito Executor
class JitoExecutor implements Executor {
async execute(request: ExecutionRequest): Promise<ExecutionResult> {
const startTime = Date.now();
const span = this.tracer.startSpan("jito_execution");
try {
// Build transaction
span.addEvent("building_transaction");
const tx = await this.buildTransaction(request);
// Sign
span.addEvent("signing_transaction");
const signedTx = await this.signTransaction(tx, request.wallet);
// Submit to Jito
span.addEvent("submitting_bundle");
const bundleId = await this.submitBundle(signedTx);
// Monitor confirmation
span.addEvent("monitoring_confirmation");
const confirmation = await this.monitorBundle(bundleId, request.timeout);
// Parse actual profit from logs
const actualProfit = await this.parseProfit(confirmation);
span.setStatus({ code: SpanStatusCode.OK });
return {
id: request.opportunity.id,
strategy: request.opportunity.strategy,
status: "success",
signature: confirmation.signature,
bundleId,
expectedProfit: request.opportunity.expectedProfit,
actualProfit: actualProfit.toString(),
latency: Date.now() - startTime,
timestamp: Date.now(),
metadata: {
slot: confirmation.slot,
executor: "jito",
},
};
} catch (error) {
span.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
return {
id: request.opportunity.id,
strategy: request.opportunity.strategy,
status: "failed",
error: error.message,
latency: Date.now() - startTime,
timestamp: Date.now(),
metadata: {
executor: "jito",
},
};
} finally {
span.end();
}
}
async buildTransaction(request: ExecutionRequest): Promise<Transaction> {
const instructions: TransactionInstruction[] = [];
// Flash loan borrow (if needed)
if (request.opportunity.useFlashLoan) {
const flashBorrowIx = await this.buildFlashLoanBorrowIx(
request.wallet.address,
request.opportunity.tokenA,
BigInt(request.opportunity.amount)
);
instructions.push(flashBorrowIx);
}
// Compute budget
instructions.push(
ComputeBudgetProgram.setComputeUnitPrice({
microLamports: 1_000_000, // 1M micro-lamports = 1000 lamports
})
);
instructions.push(
ComputeBudgetProgram.setComputeUnitLimit({
units: 400_000,
})
);
// Swap instructions
for (const route of request.opportunity.routes) {
const swapIxs = await this.buildSwapInstructions(
request.wallet.address,
route
);
instructions.push(...swapIxs);
}
// Flash loan repay (if needed)
if (request.opportunity.useFlashLoan) {
const flashRepayIx = await this.buildFlashLoanRepayIx(
request.wallet.address,
request.opportunity.tokenA,
BigInt(request.opportunity.amount)
);
instructions.push(flashRepayIx);
}
// Build transaction
const tx = new Transaction();
tx.add(...instructions);
// Fetch recent blockhash
const { blockhash } = await this.connection.getLatestBlockhash();
tx.recentBlockhash = blockhash;
tx.feePayer = request.wallet.publicKey;
// Compress with ALT
const compressedTx = await this.compressWithALT(tx);
return compressedTx;
}
async submitBundle(tx: Transaction): Promise<string> {
const tipAccount = await this.jitoClient.getRandomTipAccount();
const tip = this.config.jitoTipBase + Math.floor(Math.random() * this.config.jitoTipStep);
const bundleId = await this.jitoClient.sendBundle({
transactions: [tx],
skipPreflightValidation: false,
});
this.logger.info({ bundleId, tip }, "Bundle submitted");
return bundleId;
}
async monitorBundle(bundleId: string, timeout: number): Promise<Confirmation> {
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
const status = await this.jitoClient.getBundleStatus(bundleId);
if (status.confirmed) {
return {
signature: status.transactions[0],
slot: status.slot,
confirmed: true,
};
}
await sleep(2000); // Poll every 2 seconds
}
throw new Error(`Bundle confirmation timeout after ${timeout}ms`);
}
}
Executor Event Schemas
ExecutionResult
interface ExecutionResult {
id: string; // Opportunity ID
strategy: string;
status: "success" | "failed" | "timeout";
signature?: string;
bundleId?: string;
expectedProfit: string;
actualProfit?: string;
error?: string;
latency: number;
timestamp: number;
metadata: Record<string, any>;
}
// NATS subject: execution.results
Complete Data Flow Example
Arbitrage Trade from Detection to Confirmation
┌─────────────────────────────────────────────────────────────┐
│ 1. SCANNER: Price Feed Scanner │
│ - Polls DEX prices every 5 seconds │
│ - Detects SOL price difference between Raydium/Meteora │
│ - Publishes: market.prices.update │
└─────────────────────────────────────────────────────────────┘
↓
{
type: "price_update",
token: "So11111111...",
prices: {
raydium: 137.5,
meteora: 137.8
},
bestBid: 137.8,
bestAsk: 137.5,
spread: 0.22,
timestamp: 1709876543210
}
↓
┌─────────────────────────────────────────────────────────────┐
│ 2. PLANNER: Arbitrage Planner │
│ - Receives price update │
│ - Calculates: buy SOL on Raydium, sell on Meteora │
│ - Gets quotes (SolRoute: 3ms + 4ms) │
│ - Expected profit: 0.05 SOL (profitable!) │
│ - Publishes: trade.opportunities.arbitrage │
└─────────────────────────────────────────────────────────────┘
↓
{
id: "abc-123-def",
strategy: "arbitrage",
tokenA: "So111...",
tokenB: "EPjFW...",
amount: "1000000000",
expectedProfit: "50000000",
priority: 85,
routes: [...],
useFlashLoan: true,
urgency: "high",
expiresAt: 1709876548210
}
↓
┌─────────────────────────────────────────────────────────────┐
│ 3. EXECUTOR: Transaction Coordinator │
│ - Receives opportunity │
│ - Checks expiration (valid) │
│ - Selects Worker Wallet #7 (available) │
│ - Selects Jito Executor (high priority) │
│ - Routes to JitoExecutor │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 4. EXECUTOR: Jito Executor │
│ - Builds transaction with flash loan │
│ - Signs with Worker Wallet #7 │
│ - Submits bundle to Jito (tip: 5,500 lamports) │
│ - Monitors confirmation (polling every 2s) │
│ - Confirmed in slot 246382819 (12s) │
│ - Parses actual profit: 0.048 SOL │
│ - Publishes: execution.results │
└─────────────────────────────────────────────────────────────┘
↓
{
id: "abc-123-def",
strategy: "arbitrage",
status: "success",
signature: "5Kn3...",
bundleId: "bundle-xyz",
expectedProfit: "50000000",
actualProfit: "48000000",
latency: 12340,
timestamp: 1709876555550,
metadata: {
slot: 246382819,
executor: "jito"
}
}
↓
┌─────────────────────────────────────────────────────────────┐
│ 5. MONITORS: Metrics & Dashboards │
│ - Prometheus: arbitrage_profit_realized=0.048 SOL │
│ - Grafana: Updates P&L chart │
│ - Jaeger: Traces full 12.3s latency │
│ - Loki: Logs "Arbitrage success, profit 0.048 SOL" │
└─────────────────────────────────────────────────────────────┘
Best Practices
Event Naming
- Use hierarchical subjects:
{domain}.{entity}.{action} - Examples:
market.prices.update,trade.opportunities.arbitrage,execution.results
Event Versioning
interface Event {
version: string; // "v1", "v2", etc.
type: string;
data: any;
}
Error Handling
- Scanners: Log and continue (don’t crash on single error)
- Planners: NAK message for retry (up to 3 times)
- Executors: DLQ (dead letter queue) after 3 failures
Idempotency
- Use UUIDs for opportunity IDs
- Check Redis for duplicate processing
- Use transaction signatures for deduplication
Timeouts
- Scanner publishing: 5s
- Planner analysis: 10s
- Executor building: 5s
- Executor submission: 30s
Metrics
- Every event published/received
- Every error
- Latency at each stage
- Success rates
Testing
Unit Tests
describe("ArbitragePlanner", () => {
it("should detect profitable arbitrage", async () => {
const planner = new ArbitragePlanner(mockConfig);
const priceUpdate: PriceUpdateEvent = {
type: "price_update",
token: SOL_MINT,
prices: { raydium: 137.5, meteora: 138.0 },
bestBid: 138.0,
bestAsk: 137.5,
spread: 0.36,
timestamp: Date.now(),
};
const opportunities = await planner.analyze(priceUpdate);
expect(opportunities).toHaveLength(1);
expect(opportunities[0].expectedProfit).toBeGreaterThan("0");
});
});
Integration Tests
describe("Scanner → Planner Integration", () => {
it("should flow from scanner to planner", async () => {
const nats = await connect({ servers: TEST_NATS_URL });
const js = nats.jetstream();
const scanner = new PriceFeedScanner(config);
const planner = new ArbitragePlanner(config);
// Start planner
planner.start();
// Emit price update from scanner
await scanner.publish({
type: "price_update",
token: SOL_MINT,
prices: { raydium: 137.5, meteora: 138.0 },
bestBid: 138.0,
bestAsk: 137.5,
spread: 0.36,
timestamp: Date.now(),
});
// Wait for opportunity
const opportunity = await waitForEvent(
js,
"trade.opportunities.arbitrage",
5000
);
expect(opportunity).toBeDefined();
expect(opportunity.strategy).toBe("arbitrage");
});
});
End-to-End Tests
describe("Full Trading Loop", () => {
it("should execute profitable arbitrage", async () => {
// Setup: Start all components
const scanner = new PriceFeedScanner(config);
const planner = new ArbitragePlanner(config);
const coordinator = new TransactionCoordinator(config);
await Promise.all([
scanner.start(),
planner.start(),
coordinator.start(),
]);
// Trigger: Emit price update
await scanner.publish(mockPriceUpdate);
// Assert: Wait for execution result
const result = await waitForEvent(
nats.jetstream(),
"execution.results",
30000
);
expect(result.status).toBe("success");
expect(result.actualProfit).toBeGreaterThan("0");
}, 60000); // 60s timeout
});
This architecture provides a scalable, maintainable, and observable system for automated trading on Solana.
