重要免责声明:仅限生产数据中心这些延迟测试仅为生产数据中心环境设计。不要在本地机器或消费者互联网连接上运行这些测试。本地带宽无法处理大量 Solana 订阅,将产生不反映真实性能的无意义结果。
同地部署要求:在您的 LaserStream 端点附近部署为了获得有意义的延迟测量,您必须将测试基础设施与所选的 LaserStream 端点部署在同一区域。网络距离将主导您的测量结果——从不同大陆进行测试将显示网络延迟,而不是 LaserStream 的性能。

理解分布式区块链系统中的延迟

在使用区块链流服务时,延迟测量变得复杂,因为分布式系统没有统一的时钟。与传统系统中可以测量到单个服务器的往返时间不同,区块链网络涉及多个验证者,每个验证者在不同时间接收和处理相同的交易。 基本挑战: 像 Solana 这样的区块链没有绝对时间的概念。每个全球的验证节点将在不同时间接收相同的交易,确认取决于集群中达到共识的百分比。这使得传统意义上的确定性延迟测量变得不可能。

承诺级别和延迟优先级

Solana 提供三个承诺级别,每个级别具有不同的延迟特性:
  • Processed: 最快,单个验证者确认(约400毫秒)
  • Confirmed: 中等,超多数确认(约2-3秒)
  • Finalized: 最慢,完整网络最终确认(约15-30秒)
对于对延迟敏感的应用程序,processed commitment 通常是目标。本指南中的所有测试都使用 processed commitment 级别,因为大多数高频使用场景优先考虑速度而非绝对最终性。

测量延迟的三种方法

1. 比较并行 gRPC 流

最可靠的方法 - 比较两个独立流到同一数据源,测量哪个流首先接收到相同的事件。 优点:
  • 消除时钟同步问题
  • 提供相对性能比较
  • 对于比较服务最准确

2. 本地时间戳与 created_at 比较

中等可靠性 - 测量系统接收到消息的时间与 LaserStream 服务在消息中嵌入的时间戳之间的差异。 限制:
  • 仅代表 LaserStream 内部创建消息的时间
  • 不会捕获到 LaserStream 的上游延迟
  • 对于真正的端到端延迟,准确性不如方法 1

3. 区块时间戳分析(不推荐)

不推荐 - 将本地接收时间与 Solana 的区块时间戳进行比较。 显著限制:
  • 区块时间戳只有秒级粒度
  • Solana 每 400ms 生成一个区块
  • 提供的有用信息极少

设置要求

区域共址

为了获得有意义的延迟测量,将测试基础设施部署在与 LaserStream 端点相同的数据中心或区域。 可用的 LaserStream 区域:
  • ewr: 美国纽约(东海岸) - https://laserstream-mainnet-ewr.helius-rpc.com
  • pitt: 美国匹兹堡(中部) - https://laserstream-mainnet-pitt.helius-rpc.com
  • slc: 美国盐湖城(西海岸) - https://laserstream-mainnet-slc.helius-rpc.com
  • ams: 欧洲阿姆斯特丹 - https://laserstream-mainnet-ams.helius-rpc.com
  • fra: 欧洲法兰克福 - https://laserstream-mainnet-fra.helius-rpc.com
  • tyo: 亚洲东京 - https://laserstream-mainnet-tyo.helius-rpc.com
  • sgp: 亚洲新加坡 - https://laserstream-mainnet-sgp.helius-rpc.com
对于 devnet 测试,使用:https://laserstream-devnet-ewr.helius-rpc.com 请参阅 LaserStream gRPC 文档 以获取完整的设置说明和端点选择指南。

Rust 环境设置

所有测量脚本都使用 Rust 和 Cargo。基本设置:
# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Setup project
cargo new latency-testing
cd latency-testing

# Add dependencies to Cargo.toml
[dependencies]
# Async runtime
tokio = { version = "1", features = ["full"] }
# Futures utilities for StreamExt, SinkExt
futures = "0.3"
# Environment variable loading
dotenvy = "0.15"
# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
# Yellowstone gRPC client and protocol
yellowstone-grpc-client = "8.0.0"
yellowstone-grpc-proto = "8.0.0"
# For timestamp analysis
prost-types = "0.12"
使用您的凭据创建一个 .env 文件:
YS_GRPC_URL=your-comparison-endpoint
YS_API_KEY=your-comparison-api-key
LS_GRPC_URL=your-laserstream-endpoint  
LS_API_KEY=your-helius-api-key
Helius Dashboard 获取您的 Helius API 密钥。LaserStream 需要至少开发者计划以访问 devnet 或专业计划以访问 mainnet。

方法 1:并行流比较

此脚本建立到不同 gRPC 端点的两个独立连接,并测量哪个首先接收到相同的 BlockMeta 消息。此方法通过使用相对时间消除了时钟同步问题。
use std::collections::HashMap;
use std::time::SystemTime;

use dotenvy::dotenv;
use futures::{StreamExt, SinkExt};
use tokio::sync::mpsc;
use tracing::{debug, error, info};

use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::prelude::{
    subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, 
    SubscribeRequestFilterBlocksMeta, SubscribeUpdate,
};

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum Source {
    Yellowstone,
    Laserstream,
}

#[derive(Default, Debug)]
struct SlotTimings {
    ys_recv_ms: Option<i128>,
    ls_recv_ms: Option<i128>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let _ = dotenv();
    tracing_subscriber::fmt().with_env_filter("info").init();

    let ys_url = std::env::var("YS_GRPC_URL").expect("YS_GRPC_URL env variable not set");
    let ls_url = std::env::var("LS_GRPC_URL").expect("LS_GRPC_URL env variable not set");
    let ys_api_key = std::env::var("YS_API_KEY").ok();
    let ls_api_key = std::env::var("LS_API_KEY").ok();

    let commitment = CommitmentLevel::Processed;
    info!(?commitment, "Starting latency comparison");

    // Establish both clients
    let mut ys_client = GeyserGrpcClient::build_from_shared(ys_url.clone())
        .expect("invalid YS url")
        .x_token(ys_api_key.clone())?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .max_decoding_message_size(10 * 1024 * 1024)
        .connect()
        .await?;

    let mut ls_client = GeyserGrpcClient::build_from_shared(ls_url.clone())
        .expect("invalid LS url")
        .x_token(ls_api_key.clone())?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .max_decoding_message_size(10 * 1024 * 1024)
        .connect()
        .await?;

    let (mut ys_tx, mut ys_rx) = ys_client.subscribe().await?;
    let (mut ls_tx, mut ls_rx) = ls_client.subscribe().await?;
    
    let subscribe_request = SubscribeRequest {
        blocks_meta: {
            let mut m = HashMap::<String, SubscribeRequestFilterBlocksMeta>::new();
            m.insert("all".to_string(), SubscribeRequestFilterBlocksMeta::default());
            m
        },
        commitment: Some(commitment as i32),
        ..Default::default()
    };

    ys_tx.send(subscribe_request.clone()).await?;
    ls_tx.send(subscribe_request).await?;

    let (agg_tx, mut agg_rx) = mpsc::unbounded_channel::<(Source, u64, i128)>();

    // Spawn task for Yellowstone stream
    {
        let agg_tx = agg_tx.clone();
        tokio::spawn(async move {
            while let Some(update_res) = ys_rx.next().await {
                match update_res {
                    Ok(update) => handle_update(Source::Yellowstone, update, &agg_tx).await,
                    Err(e) => {
                        error!(target: "ys", "stream error: {:?}", e);
                        break;
                    }
                }
            }
        });
    }

    // Spawn task for Laserstream stream
    {
        let agg_tx = agg_tx.clone();
        tokio::spawn(async move {
            while let Some(update_res) = ls_rx.next().await {
                match update_res {
                    Ok(update) => handle_update(Source::Laserstream, update, &agg_tx).await,
                    Err(e) => {
                        error!(target: "ls", "stream error: {:?}", e);
                        break;
                    }
                }
            }
        });
    }

    // Aggregator – collect latencies per slot and print once we have both sources
    let mut timings: HashMap<u64, SlotTimings> = HashMap::new();
    let mut deltas: Vec<i128> = Vec::new();
    let mut count = 0;

    println!("slot,ys_recv_ms,ls_recv_ms,delta_ms");

    while let Some((source, slot, latency_ms)) = agg_rx.recv().await {
        let entry = timings.entry(slot).or_default();
        match source {
            Source::Yellowstone => entry.ys_recv_ms = Some(latency_ms),
            Source::Laserstream => entry.ls_recv_ms = Some(latency_ms),
        }

        if let (Some(ys), Some(ls)) = (entry.ys_recv_ms, entry.ls_recv_ms) {
            let delta = ys - ls; // positive => YS arrived later
            println!("{slot},{ys},{ls},{delta}");
            
            deltas.push(delta);
            count += 1;
            
            if count % 100 == 0 {
                print_statistics(&deltas, count);
            }
            
            timings.remove(&slot);
        }
    }

    Ok(())
}

async fn handle_update(
    source: Source,
    update: SubscribeUpdate,
    agg_tx: &mpsc::UnboundedSender<(Source, u64, i128)>,
) {
    if let Some(UpdateOneof::BlockMeta(block_meta)) = update.update_oneof {
        let slot = block_meta.slot;
        let recv_ms = system_time_to_millis(SystemTime::now());
        debug!(?source, slot, recv_ms, "BlockMeta received");
        let _ = agg_tx.send((source, slot, recv_ms));
    }
}

fn print_statistics(deltas: &[i128], count: usize) {
    if deltas.is_empty() {
        return;
    }
    
    let mut sorted_deltas = deltas.to_vec();
    sorted_deltas.sort();
    
    let median = if sorted_deltas.len() % 2 == 0 {
        let mid = sorted_deltas.len() / 2;
        (sorted_deltas[mid - 1] + sorted_deltas[mid]) / 2
    } else {
        sorted_deltas[sorted_deltas.len() / 2]
    };
    
    let min = *sorted_deltas.first().unwrap();
    let max = *sorted_deltas.last().unwrap();
    let sum: i128 = sorted_deltas.iter().sum();
    let mean = sum / sorted_deltas.len() as i128;
    
    let p25_idx = (sorted_deltas.len() as f64 * 0.25) as usize;
    let p75_idx = (sorted_deltas.len() as f64 * 0.75) as usize;
    let p95_idx = (sorted_deltas.len() as f64 * 0.95) as usize;
    
    let p25 = sorted_deltas[p25_idx.min(sorted_deltas.len() - 1)];
    let p75 = sorted_deltas[p75_idx.min(sorted_deltas.len() - 1)];
    let p95 = sorted_deltas[p95_idx.min(sorted_deltas.len() - 1)];
    
    eprintln!("--- Statistics after {} slots ---", count);
    eprintln!("Delta (YS - LS) in milliseconds:");
    eprintln!("  Min: {}, Max: {}", min, max);
    eprintln!("  Mean: {}, Median: {}", mean, median);
    eprintln!("  P25: {}, P75: {}, P95: {}", p25, p75, p95);
    eprintln!("  Positive deltas (YS slower): {}/{} ({:.1}%)", 
              sorted_deltas.iter().filter(|&&x| x > 0).count(),
              sorted_deltas.len(),
              sorted_deltas.iter().filter(|&&x| x > 0).count() as f64 / sorted_deltas.len() as f64 * 100.0);
    eprintln!("---");
}

fn system_time_to_millis(st: SystemTime) -> i128 {
    st.duration_since(SystemTime::UNIX_EPOCH)
        .unwrap()
        .as_millis() as i128
}
测量内容: 两个流服务之间的相对性能差异。增量显示哪个服务首先传递相同的槽信息。 关键指标:
  • 正增量:第一个服务 (YS) 比第二个服务 (LS) 慢 - LaserStream 更快
  • 负增量:第一个服务 (YS) 比第二个服务 (LS) 快 - LaserStream 更慢
  • 平均/中位数:平均性能差异
  • P95:95% 百分位延迟差异
运行测试:
cargo run --bin latency-comparison
示例输出:
slot,ys_recv_ms,ls_recv_ms,delta_ms
352416939,1752168399141,1752168399140,1
352416940,1752168399526,1752168399512,14
352416941,1752168399890,1752168399877,13
输出显示实时延迟差异和周期性统计数据。正的平均增量表示第二个服务 (LaserStream) 一贯更快地传递数据。

方法 2:创建时间戳分析

此方法将消息中嵌入的 created_at 时间戳与接收时的本地系统时间进行比较。
use std::time::{Duration, SystemTime};
use dotenvy::dotenv;
use tracing::{debug, error, info};
use yellowstone_grpc_proto::prost_types::Timestamp;
use futures::StreamExt;
use futures::SinkExt;
use std::collections::HashMap;

use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::prelude::{
    subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
    SubscribeRequestFilterTransactions, SubscribeUpdate,
};

const ACCOUNTS_INCLUDE: &[&str] = &["BB5dnY55FXS1e1NXqZDwCzgdYJdMCj3B92PU6Q5Fb6DT"];
const COMMITMENT_LEVEL: &str = "processed";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let _ = dotenv();
    tracing_subscriber::fmt().with_env_filter("info").init();

    let grpc_url = std::env::var("YS_GRPC_URL").expect("GRPC_URL env variable not set");
    let api_key = std::env::var("YS_API_KEY").ok();

    info!("Connecting to {} …", grpc_url);
    debug!(accounts_include = ?ACCOUNTS_INCLUDE, "Subscribing with accountsInclude filter");

    let mut client = GeyserGrpcClient::build_from_shared(grpc_url.clone())
        .expect("invalid URL")
        .x_token(api_key.clone())?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .connect()
        .await?;

    let (mut subscribe_tx, mut subscribe_rx) = client.subscribe().await?;

    let mut tx_filter_map = HashMap::new();
    tx_filter_map.insert(
        "latency".to_string(),
        SubscribeRequestFilterTransactions {
            account_include: ACCOUNTS_INCLUDE.iter().map(|s| s.to_string()).collect(),
            vote: Some(false),
            failed: Some(false),
            ..Default::default()
        },
    );

    subscribe_tx
        .send(SubscribeRequest {
            transactions: tx_filter_map,
            commitment: Some(CommitmentLevel::Processed as i32),
            ..Default::default()
        })
        .await?;

    let mut latencies: Vec<i64> = Vec::new();
    let mut count = 0;

    while let Some(update_res) = subscribe_rx.next().await {
        match update_res {
            Ok(update) => {
                if let Some(UpdateOneof::Transaction(tx)) = update.update_oneof {
                    let recv_time = SystemTime::now();
                    
                    if let Some(created_at) = update.created_at {
                        let created_time = SystemTime::UNIX_EPOCH + Duration::new(
                            created_at.seconds as u64,
                            created_at.nanos as u32,
                        );
                        
                        if let Ok(latency) = recv_time.duration_since(created_time) {
                            let latency_ms = latency.as_millis() as i64;
                            latencies.push(latency_ms);
                            count += 1;
                            
                            println!("Transaction latency: {}ms", latency_ms);
                            
                            if count % 100 == 0 {
                                print_statistics(&latencies, count);
                            }
                        }
                    }
                }
            }
            Err(e) => {
                error!("Stream error: {:?}", e);
                break;
            }
        }
    }

    Ok(())
}

fn print_statistics(latencies: &[i64], count: usize) {
    if latencies.is_empty() {
        return;
    }
    
    let mut sorted = latencies.to_vec();
    sorted.sort();
    
    let median = if sorted.len() % 2 == 0 {
        let mid = sorted.len() / 2;
        (sorted[mid - 1] + sorted[mid]) / 2
    } else {
        sorted[sorted.len() / 2]
    };
    
    let min = *sorted.first().unwrap();
    let max = *sorted.last().unwrap();
    let sum: i64 = sorted.iter().sum();
    let mean = sum / sorted.len() as i64;
    
    let p95_idx = (sorted.len() as f64 * 0.95) as usize;
    let p95 = sorted[p95_idx.min(sorted.len() - 1)];
    
    eprintln!("--- Statistics after {} transactions ---", count);
    eprintln!("Latency (created_at to receive) in milliseconds:");
    eprintln!("  Min: {}, Max: {}", min, max);
    eprintln!("  Mean: {}, Median: {}", mean, median);
    eprintln!("  P95: {}", p95);
    eprintln!("---");
}
重要限制: 此方法仅测量从 LaserStream 创建消息到您接收消息的时间。不考虑区块链事件与 LaserStream 处理之间的任何上游延迟。 运行测试:
cargo run --bin timestamp-analysis
示例输出:
Transaction latency: 45ms
Transaction latency: 52ms
Transaction latency: 38ms
--- Statistics after 100 transactions ---
Latency (created_at to receive) in milliseconds:
  Min: 28, Max: 89
  Mean: 47, Median: 45
  P95: 72
---
此方法提供了LaserStream和您的应用程序之间网络和处理延迟的见解,但应与方法1结合使用以进行全面分析。

延迟测试的最佳实践

关键原则

  • 同地部署:在与您的LaserStream端点相同的区域部署测试以最小化网络延迟
  • 多种方法:使用并行流比较(方法1)作为主要指标,并辅以时间戳分析
  • 长期监控:长时间运行测试以捕获不同的网络条件和区块链拥堵
  • 统计分析:关注百分位数(P95, P99)而不仅仅是平均值,以了解尾部延迟

结果解读

  1. 建立基线:至少运行1小时的测试以在正常条件下建立基线性能
  2. 识别模式:寻找延迟峰值的模式——它们是否与高区块链活动或网络拥堵相关?
  3. 比较百分位数:P95延迟通常比平均延迟对用户体验更重要
  4. 监控一致性:一致的性能通常比绝对最小延迟更有价值
请记住,由于网络共识要求,区块链延迟本质上是可变的。关注相对性能差异和一致性,而不是绝对数值。