Learn how to properly measure and analyze latency for gRPC streams using various testing methods.
CRITICAL DISCLAIMER: Production Data Center Only
These latency tests are designed for production data center environments only. DO NOT run these tests on local machines or consumer internet connections. Local bandwidth cannot handle heavy Solana subscriptions and will produce meaningless results that don’t reflect real-world performance.
CO-LOCATION REQUIREMENT: Deploy Near Your LaserStream Endpoint
For meaningful latency measurements, you must co-locate your testing infrastructure in the same region as your chosen LaserStream endpoint. Network distance will dominate your measurements - testing from a different continent will show network latency, not LaserStream performance.
Understanding latency in distributed blockchain systems
When working with blockchain streaming services, latency measurement becomes complex because distributed systems don’t have a universal clock. Unlike traditional systems where you can measure round-trip time to a single server, blockchain networks involve multiple validators, each receiving and processing the same transaction at different times.
The fundamental challenge: Blockchains like Solana don’t have a concept of absolute time. Each validator node globally will receive the same transaction at different times, and confirmation depends on a percentage of the cluster reaching consensus. This makes deterministic latency measurement impossible in the traditional sense.
For latency-sensitive applications, processed commitment is typically the target. All tests in this guide use processed commitment level since most high-frequency use cases prioritize speed over absolute finality.
Moderate reliability - Measures the difference between when your system receives a message and the timestamp embedded in the message by the LaserStream service.
Limitations:
Only represents when LaserStream created the message internally
Upstream delays to LaserStream won’t be captured
Less accurate than Method 1 for true end-to-end latency
This script establishes two independent connections to different gRPC endpoints and measures which receives the same BlockMeta messages first. This approach eliminates clock synchronization issues by using relative timing.
Copy
Ask AI
use std::collections::HashMap;use std::time::SystemTime;use dotenvy::dotenv;use futures::{StreamExt, SinkExt};use tokio::sync::mpsc;use tracing::{debug, error, info};use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};use yellowstone_grpc_proto::prelude::{ subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocksMeta, SubscribeUpdate,};#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]enum Source { Yellowstone, Laserstream,}#[derive(Default, Debug)]struct SlotTimings { ys_recv_ms: Option<i128>, ls_recv_ms: Option<i128>,}#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let _ = dotenv(); tracing_subscriber::fmt().with_env_filter("info").init(); let ys_url = std::env::var("YS_GRPC_URL").expect("YS_GRPC_URL env variable not set"); let ls_url = std::env::var("LS_GRPC_URL").expect("LS_GRPC_URL env variable not set"); let ys_api_key = std::env::var("YS_API_KEY").ok(); let ls_api_key = std::env::var("LS_API_KEY").ok(); let commitment = CommitmentLevel::Processed; info!(?commitment, "Starting latency comparison"); // Establish both clients let mut ys_client = GeyserGrpcClient::build_from_shared(ys_url.clone()) .expect("invalid YS url") .x_token(ys_api_key.clone())? .tls_config(ClientTlsConfig::new().with_native_roots())? .max_decoding_message_size(10 * 1024 * 1024) .connect() .await?; let mut ls_client = GeyserGrpcClient::build_from_shared(ls_url.clone()) .expect("invalid LS url") .x_token(ls_api_key.clone())? .tls_config(ClientTlsConfig::new().with_native_roots())? .max_decoding_message_size(10 * 1024 * 1024) .connect() .await?; let (mut ys_tx, mut ys_rx) = ys_client.subscribe().await?; let (mut ls_tx, mut ls_rx) = ls_client.subscribe().await?; let subscribe_request = SubscribeRequest { blocks_meta: { let mut m = HashMap::<String, SubscribeRequestFilterBlocksMeta>::new(); m.insert("all".to_string(), SubscribeRequestFilterBlocksMeta::default()); m }, commitment: Some(commitment as i32), ..Default::default() }; ys_tx.send(subscribe_request.clone()).await?; ls_tx.send(subscribe_request).await?; let (agg_tx, mut agg_rx) = mpsc::unbounded_channel::<(Source, u64, i128)>(); // Spawn task for Yellowstone stream { let agg_tx = agg_tx.clone(); tokio::spawn(async move { while let Some(update_res) = ys_rx.next().await { match update_res { Ok(update) => handle_update(Source::Yellowstone, update, &agg_tx).await, Err(e) => { error!(target: "ys", "stream error: {:?}", e); break; } } } }); } // Spawn task for Laserstream stream { let agg_tx = agg_tx.clone(); tokio::spawn(async move { while let Some(update_res) = ls_rx.next().await { match update_res { Ok(update) => handle_update(Source::Laserstream, update, &agg_tx).await, Err(e) => { error!(target: "ls", "stream error: {:?}", e); break; } } } }); } // Aggregator – collect latencies per slot and print once we have both sources let mut timings: HashMap<u64, SlotTimings> = HashMap::new(); let mut deltas: Vec<i128> = Vec::new(); let mut count = 0; println!("slot,ys_recv_ms,ls_recv_ms,delta_ms"); while let Some((source, slot, latency_ms)) = agg_rx.recv().await { let entry = timings.entry(slot).or_default(); match source { Source::Yellowstone => entry.ys_recv_ms = Some(latency_ms), Source::Laserstream => entry.ls_recv_ms = Some(latency_ms), } if let (Some(ys), Some(ls)) = (entry.ys_recv_ms, entry.ls_recv_ms) { let delta = ys - ls; // positive => YS arrived later println!("{slot},{ys},{ls},{delta}"); deltas.push(delta); count += 1; if count % 100 == 0 { print_statistics(&deltas, count); } timings.remove(&slot); } } Ok(())}async fn handle_update( source: Source, update: SubscribeUpdate, agg_tx: &mpsc::UnboundedSender<(Source, u64, i128)>,) { if let Some(UpdateOneof::BlockMeta(block_meta)) = update.update_oneof { let slot = block_meta.slot; let recv_ms = system_time_to_millis(SystemTime::now()); debug!(?source, slot, recv_ms, "BlockMeta received"); let _ = agg_tx.send((source, slot, recv_ms)); }}fn print_statistics(deltas: &[i128], count: usize) { if deltas.is_empty() { return; } let mut sorted_deltas = deltas.to_vec(); sorted_deltas.sort(); let median = if sorted_deltas.len() % 2 == 0 { let mid = sorted_deltas.len() / 2; (sorted_deltas[mid - 1] + sorted_deltas[mid]) / 2 } else { sorted_deltas[sorted_deltas.len() / 2] }; let min = *sorted_deltas.first().unwrap(); let max = *sorted_deltas.last().unwrap(); let sum: i128 = sorted_deltas.iter().sum(); let mean = sum / sorted_deltas.len() as i128; let p25_idx = (sorted_deltas.len() as f64 * 0.25) as usize; let p75_idx = (sorted_deltas.len() as f64 * 0.75) as usize; let p95_idx = (sorted_deltas.len() as f64 * 0.95) as usize; let p25 = sorted_deltas[p25_idx.min(sorted_deltas.len() - 1)]; let p75 = sorted_deltas[p75_idx.min(sorted_deltas.len() - 1)]; let p95 = sorted_deltas[p95_idx.min(sorted_deltas.len() - 1)]; eprintln!("--- Statistics after {} slots ---", count); eprintln!("Delta (YS - LS) in milliseconds:"); eprintln!(" Min: {}, Max: {}", min, max); eprintln!(" Mean: {}, Median: {}", mean, median); eprintln!(" P25: {}, P75: {}, P95: {}", p25, p75, p95); eprintln!(" Positive deltas (YS slower): {}/{} ({:.1}%)", sorted_deltas.iter().filter(|&&x| x > 0).count(), sorted_deltas.len(), sorted_deltas.iter().filter(|&&x| x > 0).count() as f64 / sorted_deltas.len() as f64 * 100.0); eprintln!("---");}fn system_time_to_millis(st: SystemTime) -> i128 { st.duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_millis() as i128}
What this measures: The relative performance difference between two streaming services. The delta shows which service delivers the same slot information first.
Key metrics:
Positive delta: First service (YS) slower than second service (LS) - LaserStream is faster
Negative delta: First service (YS) faster than second service (LS) - LaserStream is slower
The output shows real-time latency differences and periodic statistics. A positive mean delta indicates the second service (LaserStream) consistently delivers data faster.
This approach compares the created_at timestamp embedded in messages against your local system time when receiving them.
Copy
Ask AI
use std::time::{Duration, SystemTime};use dotenvy::dotenv;use tracing::{debug, error, info};use yellowstone_grpc_proto::prost_types::Timestamp;use futures::StreamExt;use futures::SinkExt;use std::collections::HashMap;use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};use yellowstone_grpc_proto::prelude::{ subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, SubscribeRequestFilterTransactions, SubscribeUpdate,};const ACCOUNTS_INCLUDE: &[&str] = &["BB5dnY55FXS1e1NXqZDwCzgdYJdMCj3B92PU6Q5Fb6DT"];const COMMITMENT_LEVEL: &str = "processed";#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let _ = dotenv(); tracing_subscriber::fmt().with_env_filter("info").init(); let grpc_url = std::env::var("YS_GRPC_URL").expect("GRPC_URL env variable not set"); let api_key = std::env::var("YS_API_KEY").ok(); info!("Connecting to {} …", grpc_url); debug!(accounts_include = ?ACCOUNTS_INCLUDE, "Subscribing with accountsInclude filter"); let mut client = GeyserGrpcClient::build_from_shared(grpc_url.clone()) .expect("invalid URL") .x_token(api_key.clone())? .tls_config(ClientTlsConfig::new().with_native_roots())? .connect() .await?; let (mut subscribe_tx, mut subscribe_rx) = client.subscribe().await?; let mut tx_filter_map = HashMap::new(); tx_filter_map.insert( "latency".to_string(), SubscribeRequestFilterTransactions { account_include: ACCOUNTS_INCLUDE.iter().map(|s| s.to_string()).collect(), vote: Some(false), failed: Some(false), ..Default::default() }, ); subscribe_tx .send(SubscribeRequest { transactions: tx_filter_map, commitment: Some(CommitmentLevel::Processed as i32), ..Default::default() }) .await?; let mut latencies: Vec<i64> = Vec::new(); let mut count = 0; while let Some(update_res) = subscribe_rx.next().await { match update_res { Ok(update) => { if let Some(UpdateOneof::Transaction(tx)) = update.update_oneof { let recv_time = SystemTime::now(); if let Some(created_at) = update.created_at { let created_time = SystemTime::UNIX_EPOCH + Duration::new( created_at.seconds as u64, created_at.nanos as u32, ); if let Ok(latency) = recv_time.duration_since(created_time) { let latency_ms = latency.as_millis() as i64; latencies.push(latency_ms); count += 1; println!("Transaction latency: {}ms", latency_ms); if count % 100 == 0 { print_statistics(&latencies, count); } } } } } Err(e) => { error!("Stream error: {:?}", e); break; } } } Ok(())}fn print_statistics(latencies: &[i64], count: usize) { if latencies.is_empty() { return; } let mut sorted = latencies.to_vec(); sorted.sort(); let median = if sorted.len() % 2 == 0 { let mid = sorted.len() / 2; (sorted[mid - 1] + sorted[mid]) / 2 } else { sorted[sorted.len() / 2] }; let min = *sorted.first().unwrap(); let max = *sorted.last().unwrap(); let sum: i64 = sorted.iter().sum(); let mean = sum / sorted.len() as i64; let p95_idx = (sorted.len() as f64 * 0.95) as usize; let p95 = sorted[p95_idx.min(sorted.len() - 1)]; eprintln!("--- Statistics after {} transactions ---", count); eprintln!("Latency (created_at to receive) in milliseconds:"); eprintln!(" Min: {}, Max: {}", min, max); eprintln!(" Mean: {}, Median: {}", mean, median); eprintln!(" P95: {}", p95); eprintln!("---");}
Important limitation: This method only measures from when LaserStream created the message to when you received it. It doesn’t account for any upstream delays between the blockchain event and LaserStream processing.
Running the test:
Copy
Ask AI
cargo run --bin timestamp-analysis
Sample output:
Copy
Ask AI
Transaction latency: 45msTransaction latency: 52msTransaction latency: 38ms--- Statistics after 100 transactions ---Latency (created_at to receive) in milliseconds: Min: 28, Max: 89 Mean: 47, Median: 45 P95: 72---
This method provides insights into the network and processing latency between LaserStream and your application, but should be used in conjunction with Method 1 for comprehensive analysis.
Establish baseline: Run tests for at least 1 hour to establish baseline performance under normal conditions
Identify patterns: Look for patterns in latency spikes - do they correlate with high blockchain activity or network congestion?
Compare percentiles: P95 latency is often more important than average latency for user experience
Monitor consistency: Consistent performance is often more valuable than absolute minimum latency
Remember that blockchain latency is inherently variable due to network consensus requirements. Focus on relative performance differences and consistency over absolute numbers.