您将构建什么
在本指南结束时,您将拥有一个工作中的 gRPC 流,可以实时监控 Solana 账户更新,并具有自动重新连接和错误处理功能。1
选择您的访问方法
选择您希望如何访问 Yellowstone gRPC:
LaserStream
推荐给大多数用户
- 多租户,高可用性
- 自动故障转移和回填
- 使用 API 密钥快速设置
- 需要 Developer+ 计划(devnet),Professional+(mainnet)
专用节点
2
设置您的环境
创建一个新项目并安装依赖项:添加到
- TypeScript/JavaScript
- Rust
- Go
报告错误代码
复制
询问AI
mkdir solana-grpc-stream
cd solana-grpc-stream
npm init -y
npm install @triton-one/yellowstone-grpc bs58
npm install typescript ts-node @types/node --save-dev
npx tsc --init
报告错误代码
复制
询问AI
cargo new solana-grpc-stream
cd solana-grpc-stream
Cargo.toml:报告错误代码
复制
询问AI
[dependencies]
yellowstone-grpc-client = "1.13.0"
yellowstone-grpc-proto = "1.13.0"
tokio = { version = "1.0", features = ["full"] }
anyhow = "1.0"
futures = "0.3"
tonic = "0.10"
报告错误代码
复制
询问AI
mkdir solana-grpc-stream
cd solana-grpc-stream
go mod init solana-grpc-stream
go get github.com/rpcpool/yellowstone-grpc/examples/golang@latest
go get google.golang.org/[email protected]
3
获取您的凭据
获取您的 gRPC 端点和身份验证信息:
- LaserStream
- 专用节点
- 在 dashboard.helius.dev 注册 Developer+ 计划(devnet)或 Professional 计划(mainnet)
- 从仪表板获取您的 API 密钥
- 选择您的区域端点:
- 美国东部:
https://laserstream-mainnet-ewr.helius-rpc.com - 美国西部:
https://laserstream-mainnet-slc.helius-rpc.com - 欧洲:
https://laserstream-mainnet-fra.helius-rpc.com - 亚洲:
https://laserstream-mainnet-tyo.helius-rpc.com
https://laserstream-devnet-ewr.helius-rpc.com- 从 dashboard.helius.dev 订购专用节点
- 一旦配置完成,您将收到:
- 您的 gRPC 端点(通常为
your-node.rpc.helius.dev:2053) - 您的身份验证令牌
- 您的 gRPC 端点(通常为
4
创建您的第一个流
使用以下完整示例创建一个强大的流管理器:
- TypeScript
- Rust
- Go
创建 创建 运行你的流:
stream-manager.ts:报告错误代码
复制
询问AI
import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';
export class StreamManager {
private client: Client;
private stream: any;
private isConnected = false;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 10;
private readonly baseReconnectDelay = 1000; // 1 second
constructor(
private endpoint: string,
private apiKey: string,
private onData: (data: any) => void,
private onError?: (error: any) => void
) {
this.client = new Client(endpoint, apiKey, {
"grpc.max_receive_message_length": 64 * 1024 * 1024
});
}
async connect(subscribeRequest: SubscribeRequest): Promise<void> {
try {
console.log(`Connecting to ${this.endpoint}...`);
this.stream = await this.client.subscribe();
this.isConnected = true;
this.reconnectAttempts = 0;
// Set up event handlers
this.stream.on("data", this.handleData.bind(this));
this.stream.on("error", this.handleStreamError.bind(this));
this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
this.stream.on("close", () => this.handleDisconnect(subscribeRequest));
// Send subscription request
await this.writeRequest(subscribeRequest);
// Start keepalive
this.startKeepalive();
console.log("✅ Connected and subscribed successfully");
} catch (error) {
console.error("Connection failed:", error);
await this.reconnect(subscribeRequest);
}
}
private async writeRequest(request: SubscribeRequest): Promise<void> {
return new Promise((resolve, reject) => {
this.stream.write(request, (err: any) => {
if (err) reject(err);
else resolve();
});
});
}
private handleData(data: any): void {
try {
// Convert buffers to readable format
const processedData = this.processBuffers(data);
this.onData(processedData);
} catch (error) {
console.error("Error processing data:", error);
}
}
private processBuffers(obj: any): any {
if (!obj) return obj;
if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
return bs58.default.encode(obj);
}
if (Array.isArray(obj)) {
return obj.map(item => this.processBuffers(item));
}
if (typeof obj === 'object') {
return Object.fromEntries(
Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
);
}
return obj;
}
private handleStreamError(error: any): void {
console.error("Stream error:", error);
this.isConnected = false;
if (this.onError) this.onError(error);
}
private async handleDisconnect(subscribeRequest: SubscribeRequest): Promise<void> {
if (this.isConnected) {
console.log("Stream disconnected, attempting to reconnect...");
this.isConnected = false;
await this.reconnect(subscribeRequest);
}
}
private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached. Giving up.");
return;
}
this.reconnectAttempts++;
const delay = this.baseReconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5));
console.log(`Reconnect attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms...`);
setTimeout(() => {
this.connect(subscribeRequest).catch(console.error);
}, delay);
}
private startKeepalive(): void {
setInterval(() => {
if (this.isConnected) {
const pingRequest: SubscribeRequest = {
ping: { id: Date.now() },
accounts: {},
accountsDataSlice: [],
transactions: {},
slots: {},
blocks: {},
blocksMeta: {},
entry: {},
transactionsStatus: {}
};
this.writeRequest(pingRequest).catch(console.error);
}
}, 30000); // 30 seconds
}
disconnect(): void {
if (this.stream) {
this.stream.end();
}
this.client.close();
this.isConnected = false;
}
}
main.ts:报告错误代码
复制
询问AI
import { StreamManager } from './stream-manager';
import { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
// Configuration
const ENDPOINT = "your-grpc-endpoint"; // LaserStream or Dedicated Node endpoint
const API_KEY = "your-api-key";
// USDC Token Mint for example
const USDC_MINT = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v";
async function main() {
const streamManager = new StreamManager(
ENDPOINT,
API_KEY,
handleAccountUpdate,
handleError
);
// Subscribe to USDC mint account updates
const subscribeRequest: SubscribeRequest = {
accounts: {
accountSubscribe: {
account: [USDC_MINT],
owner: [],
filters: []
}
},
accountsDataSlice: [],
commitment: CommitmentLevel.CONFIRMED,
slots: {},
transactions: {},
transactionsStatus: {},
blocks: {},
blocksMeta: {},
entry: {}
};
console.log("🚀 Starting USDC mint account monitoring...");
await streamManager.connect(subscribeRequest);
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('\n🛑 Shutting down...');
streamManager.disconnect();
process.exit(0);
});
}
function handleAccountUpdate(data: any): void {
if (data.account) {
const account = data.account.account;
console.log('\n📊 Account Update:');
console.log(` Account: ${account.pubkey}`);
console.log(` Owner: ${account.owner}`);
console.log(` Lamports: ${account.lamports}`);
console.log(` Data Length: ${account.data?.length || 0} bytes`);
console.log(` Slot: ${data.account.slot}`);
console.log(` Timestamp: ${new Date().toISOString()}`);
}
if (data.pong) {
console.log(`💓 Keepalive pong received (id: ${data.pong.id})`);
}
}
function handleError(error: any): void {
console.error('❌ Stream error:', error.message);
}
main().catch(console.error);
报告错误代码
复制
询问AI
npx ts-node main.ts
创建 运行你的流:
src/main.rs:报告错误代码
复制
询问AI
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::*;
use futures::StreamExt;
use std::collections::HashMap;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let endpoint = "your-grpc-endpoint"; // Replace with your endpoint
let token = Some("your-api-key".to_string()); // Replace with your API key
let mut client = GeyserGrpcClient::connect(endpoint, token, None).await?;
// USDC mint account
let usdc_mint = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v";
let mut accounts = HashMap::new();
accounts.insert(
"usdc_mint".to_string(),
SubscribeRequestFilterAccounts {
account: vec![usdc_mint.to_string()],
owner: vec![],
filters: vec![],
}
);
let mut stream = client.subscribe_once(
accounts,
HashMap::new(), // slots
HashMap::new(), // transactions
HashMap::new(), // blocks
HashMap::new(), // blocks_meta
None, // commitment
HashMap::new(), // accounts_data_slice
Some(CommitmentLevel::Confirmed),
HashMap::new(), // entry
).await?;
println!("🚀 Connected! Monitoring USDC mint account...");
while let Some(message) = stream.next().await {
match message {
Ok(msg) => {
if let Some(account) = msg.update_oneof {
match account {
subscribe_update::UpdateOneof::Account(account_update) => {
println!("\n📊 Account Update:");
println!(" Account: {}", account_update.account.as_ref()
.map(|a| &a.pubkey).unwrap_or(&"N/A".to_string()));
println!(" Lamports: {}", account_update.account.as_ref()
.map(|a| a.lamports).unwrap_or(0));
println!(" Slot: {}", account_update.slot);
}
_ => {} // Handle other update types as needed
}
}
}
Err(error) => {
eprintln!("❌ Stream error: {}", error);
sleep(Duration::from_secs(1)).await;
}
}
}
Ok(())
}
报告错误代码
复制
询问AI
cargo run
创建 运行你的流:
main.go:报告错误代码
复制
询问AI
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/rpcpool/yellowstone-grpc/examples/golang/pkg/grpc"
pb "github.com/rpcpool/yellowstone-grpc/examples/golang/pkg/proto"
"google.golang.org/grpc/metadata"
)
func main() {
endpoint := "your-grpc-endpoint" // Replace with your endpoint
apiKey := "your-api-key" // Replace with your API key
client, err := grpc.NewGrpcConnection(context.Background(), endpoint)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer client.Close()
// Add authentication
ctx := metadata.AppendToOutgoingContext(context.Background(), "x-token", apiKey)
// USDC mint account
usdcMint := "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"
stream, err := client.Subscribe(ctx)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
// Send subscription request
request := &pb.SubscribeRequest{
Accounts: map[string]*pb.SubscribeRequestFilterAccounts{
"usdc_mint": {
Account: []string{usdcMint},
Owner: []string{},
Filters: []*pb.SubscribeRequestFilterAccountsFilter{},
},
},
Commitment: pb.CommitmentLevel_CONFIRMED,
}
if err := stream.Send(request); err != nil {
log.Fatalf("Failed to send request: %v", err)
}
fmt.Println("🚀 Connected! Monitoring USDC mint account...")
for {
response, err := stream.Recv()
if err != nil {
log.Printf("❌ Stream error: %v", err)
time.Sleep(time.Second)
continue
}
if account := response.GetAccount(); account != nil {
fmt.Printf("\n📊 Account Update:\n")
fmt.Printf(" Account: %s\n", account.Account.Pubkey)
fmt.Printf(" Lamports: %d\n", account.Account.Lamports)
fmt.Printf(" Slot: %d\n", account.Slot)
fmt.Printf(" Timestamp: %s\n", time.Now().Format(time.RFC3339))
}
}
}
报告错误代码
复制
询问AI
go run main.go
5
测试你的流
运行你的应用程序并验证其工作:
- 启动你的流 使用适合你的语言的命令
- 在控制台中查找连接确认
- 等待账户更新 - 你应该会看到USDC铸币账户的定期更新
- 测试重新连接 通过暂时断开互联网连接
- 验证保活 通过每30秒观察一次pong消息
报告错误代码
复制
询问AI
🚀 Connected! Monitoring USDC mint account...
✅ Connected and subscribed successfully
💓 Keepalive pong received (id: 1703123456789)
📊 Account Update:
Account: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v
Owner: TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb
Lamports: 1461600
Data Length: 82 bytes
Slot: 275123456
Timestamp: 2024-01-15T10:30:45.123Z
接下来是什么?
现在你有了一个工作的gRPC流,探索这些监控指南:故障排除
连接问题
连接问题
症状: 连接超时,身份验证错误解决方案:
- 验证你的端点URL和API密钥
- 检查你的计划是否包括gRPC访问
- 确保你使用的是正确的端口(通常为专用节点的2053)
- 对于LaserStream,确保你至少有开发者计划(devnet)或专业计划(mainnet)
未接收到数据
未接收到数据
症状: 流连接成功但没有账户更新出现解决方案:
- USDC 铸币更新不频繁 - 尝试监控一个更活跃的账户
- 检查您的承诺级别(尝试
PROCESSED以获取更频繁的更新) - 验证您的账户过滤器配置
- 监控一个代币账户而不是铸币以获得更多活动
流断开连接
流断开连接
症状: 频繁断开连接,重新连接循环解决方案:
- 实现指数退避(包含在上面的示例中)
- 检查网络稳定性
- 确保保活 ping 正常工作(每 30 秒一次)
- 监控服务器端速率限制
最佳实践
生产就绪检查清单:
- ✅ 为重新连接实现指数退避
- ✅ 每 30 秒使用保活 ping
- ✅ 处理所有流事件(数据、错误、结束、关闭)
- ✅ 异步处理数据以避免阻塞
- ✅ 监控连接健康状况并在故障时发出警报
- ✅ 根据您的用例使用适当的承诺级别
- ✅ 尽可能具体地过滤数据以减少带宽