您将构建什么

在本指南结束时,您将拥有一个工作中的 gRPC 流,可以实时监控 Solana 账户更新,并具有自动重新连接和错误处理功能。
1

选择您的访问方法

选择您希望如何访问 Yellowstone gRPC:

LaserStream

推荐给大多数用户
  • 多租户,高可用性
  • 自动故障转移和回填
  • 使用 API 密钥快速设置
  • 需要 Developer+ 计划(devnet),Professional+(mainnet)
获取 LaserStream 访问权限 →

专用节点

适用于高流量或自定义需求
  • 独占 gRPC 端点
  • 保证资源
获取专用节点 →
2

设置您的环境

创建一个新项目并安装依赖项:
mkdir solana-grpc-stream
cd solana-grpc-stream
npm init -y
npm install @triton-one/yellowstone-grpc bs58
npm install typescript ts-node @types/node --save-dev
npx tsc --init
3

获取您的凭据

获取您的 gRPC 端点和身份验证信息:
  1. dashboard.helius.dev 注册 Developer+ 计划(devnet)或 Professional 计划(mainnet)
  2. 从仪表板获取您的 API 密钥
  3. 选择您的区域端点:
Mainnet 端点:
  • 美国东部:https://laserstream-mainnet-ewr.helius-rpc.com
  • 美国西部:https://laserstream-mainnet-slc.helius-rpc.com
  • 欧洲:https://laserstream-mainnet-fra.helius-rpc.com
  • 亚洲:https://laserstream-mainnet-tyo.helius-rpc.com
Devnet: https://laserstream-devnet-ewr.helius-rpc.com
4

创建您的第一个流

使用以下完整示例创建一个强大的流管理器:
创建 stream-manager.ts:
import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';

export class StreamManager {
  private client: Client;
  private stream: any;
  private isConnected = false;
  private reconnectAttempts = 0;
  private readonly maxReconnectAttempts = 10;
  private readonly baseReconnectDelay = 1000; // 1 second

  constructor(
    private endpoint: string,
    private apiKey: string,
    private onData: (data: any) => void,
    private onError?: (error: any) => void
  ) {
    this.client = new Client(endpoint, apiKey, {
      "grpc.max_receive_message_length": 64 * 1024 * 1024
    });
  }

  async connect(subscribeRequest: SubscribeRequest): Promise<void> {
    try {
      console.log(`Connecting to ${this.endpoint}...`);
      this.stream = await this.client.subscribe();
      this.isConnected = true;
      this.reconnectAttempts = 0;

      // Set up event handlers
      this.stream.on("data", this.handleData.bind(this));
      this.stream.on("error", this.handleStreamError.bind(this));
      this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
      this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

      // Send subscription request
      await this.writeRequest(subscribeRequest);
      
      // Start keepalive
      this.startKeepalive();
      
      console.log("✅ Connected and subscribed successfully");
    } catch (error) {
      console.error("Connection failed:", error);
      await this.reconnect(subscribeRequest);
    }
  }

  private async writeRequest(request: SubscribeRequest): Promise<void> {
    return new Promise((resolve, reject) => {
      this.stream.write(request, (err: any) => {
        if (err) reject(err);
        else resolve();
      });
    });
  }

  private handleData(data: any): void {
    try {
      // Convert buffers to readable format
      const processedData = this.processBuffers(data);
      this.onData(processedData);
    } catch (error) {
      console.error("Error processing data:", error);
    }
  }

  private processBuffers(obj: any): any {
    if (!obj) return obj;
    
    if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
      return bs58.default.encode(obj);
    }
    
    if (Array.isArray(obj)) {
      return obj.map(item => this.processBuffers(item));
    }
    
    if (typeof obj === 'object') {
      return Object.fromEntries(
        Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
      );
    }
    
    return obj;
  }

  private handleStreamError(error: any): void {
    console.error("Stream error:", error);
    this.isConnected = false;
    if (this.onError) this.onError(error);
  }

  private async handleDisconnect(subscribeRequest: SubscribeRequest): Promise<void> {
    if (this.isConnected) {
      console.log("Stream disconnected, attempting to reconnect...");
      this.isConnected = false;
      await this.reconnect(subscribeRequest);
    }
  }

  private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached. Giving up.");
      return;
    }

    this.reconnectAttempts++;
    const delay = this.baseReconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5));
    
    console.log(`Reconnect attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms...`);
    
    setTimeout(() => {
      this.connect(subscribeRequest).catch(console.error);
    }, delay);
  }

  private startKeepalive(): void {
    setInterval(() => {
      if (this.isConnected) {
        const pingRequest: SubscribeRequest = {
          ping: { id: Date.now() },
          accounts: {},
          accountsDataSlice: [],
          transactions: {},
          slots: {},
          blocks: {},
          blocksMeta: {},
          entry: {},
          transactionsStatus: {}
        };
        
        this.writeRequest(pingRequest).catch(console.error);
      }
    }, 30000); // 30 seconds
  }

  disconnect(): void {
    if (this.stream) {
      this.stream.end();
    }
    this.client.close();
    this.isConnected = false;
  }
}
创建 main.ts:
import { StreamManager } from './stream-manager';
import { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";

// Configuration
const ENDPOINT = "your-grpc-endpoint"; // LaserStream or Dedicated Node endpoint
const API_KEY = "your-api-key";

// USDC Token Mint for example
const USDC_MINT = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v";

async function main() {
  const streamManager = new StreamManager(
    ENDPOINT,
    API_KEY,
    handleAccountUpdate,
    handleError
  );

  // Subscribe to USDC mint account updates
  const subscribeRequest: SubscribeRequest = {
    accounts: {
      accountSubscribe: {
        account: [USDC_MINT],
        owner: [],
        filters: []
      }
    },
    accountsDataSlice: [],
    commitment: CommitmentLevel.CONFIRMED,
    slots: {},
    transactions: {},
    transactionsStatus: {},
    blocks: {},
    blocksMeta: {},
    entry: {}
  };

  console.log("🚀 Starting USDC mint account monitoring...");
  await streamManager.connect(subscribeRequest);

  // Handle graceful shutdown
  process.on('SIGINT', () => {
    console.log('\n🛑 Shutting down...');
    streamManager.disconnect();
    process.exit(0);
  });
}

function handleAccountUpdate(data: any): void {
  if (data.account) {
    const account = data.account.account;
    console.log('\n📊 Account Update:');
    console.log(`  Account: ${account.pubkey}`);
    console.log(`  Owner: ${account.owner}`);
    console.log(`  Lamports: ${account.lamports}`);
    console.log(`  Data Length: ${account.data?.length || 0} bytes`);
    console.log(`  Slot: ${data.account.slot}`);
    console.log(`  Timestamp: ${new Date().toISOString()}`);
  }
  
  if (data.pong) {
    console.log(`💓 Keepalive pong received (id: ${data.pong.id})`);
  }
}

function handleError(error: any): void {
  console.error('❌ Stream error:', error.message);
}

main().catch(console.error);
运行你的流:
npx ts-node main.ts
5

测试你的流

运行你的应用程序并验证其工作:
  1. 启动你的流 使用适合你的语言的命令
  2. 在控制台中查找连接确认
  3. 等待账户更新 - 你应该会看到USDC铸币账户的定期更新
  4. 测试重新连接 通过暂时断开互联网连接
  5. 验证保活 通过每30秒观察一次pong消息
预期输出:
🚀 Connected! Monitoring USDC mint account...
✅ Connected and subscribed successfully
💓 Keepalive pong received (id: 1703123456789)

📊 Account Update:
  Account: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v
  Owner: TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb
  Lamports: 1461600
  Data Length: 82 bytes
  Slot: 275123456
  Timestamp: 2024-01-15T10:30:45.123Z

接下来是什么?

现在你有了一个工作的gRPC流,探索这些监控指南:

故障排除

最佳实践

生产就绪检查清单:
  • ✅ 为重新连接实现指数退避
  • ✅ 每 30 秒使用保活 ping
  • ✅ 处理所有流事件(数据、错误、结束、关闭)
  • ✅ 异步处理数据以避免阻塞
  • ✅ 监控连接健康状况并在故障时发出警报
  • ✅ 根据您的用例使用适当的承诺级别
  • ✅ 尽可能具体地过滤数据以减少带宽