> ## Documentation Index
> Fetch the complete documentation index at: https://www.helius.dev/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Solana Yellowstone gRPC 快速入门：实时数据流

> 完整的设置指南，用于通过 Yellowstone gRPC 流式传输实时 Solana 数据。从安装到您的第一个工作流，支持自动重新连接。

<Note>
  本指南记录了开放的 **Yellowstone gRPC** 协议。有关带重播、自动重连和 `helius-laserstream` SDK 的 Helius 托管等价物，请参见 [LaserStream gRPC](/zh/laserstream/grpc)。
</Note>

## 您将构建的内容

在本指南结束时，您将拥有一个可工作的 gRPC 流，可以实时监控 Solana 帐户更新，并具有自动重连和错误处理。

<Steps>
  <Step title="选择您的访问方法">
    选择您希望如何访问Yellowstone gRPC：

    <CardGroup cols={2}>
      <Card title="LaserStream" icon="rocket">
        **推荐大多数用户使用**

        * 多租户，高可用性
        * 自动故障转移和回填
        * 使用API密钥快速设置
        * 所有计划（devnet），Business+（mainnet）

        [获取LaserStream访问权限 →](/zh/laserstream/)
      </Card>

      <Card title="专用节点" icon="server">
        **适用于高流量或定制需求**

        * 专属gRPC端点
        * 资源保证

        [获取专用节点 →](/zh/dedicated-nodes/)
      </Card>
    </CardGroup>
  </Step>

  <Step title="设置您的环境">
    创建一个新项目并安装依赖：

    <Tabs>
      <Tab title="TypeScript/JavaScript">
        ```bash theme={"system"}
        mkdir solana-grpc-stream
        cd solana-grpc-stream
        npm init -y
        npm install @triton-one/yellowstone-grpc bs58
        npm install typescript ts-node @types/node --save-dev
        npx tsc --init
        ```
      </Tab>

      <Tab title="Rust">
        ```bash theme={"system"}
        cargo new solana-grpc-stream
        cd solana-grpc-stream
        ```

        添加到 `Cargo.toml`：

        ```toml theme={"system"}
        [dependencies]
        yellowstone-grpc-client = "1.13.0"
        yellowstone-grpc-proto = "1.13.0"
        tokio = { version = "1.0", features = ["full"] }
        anyhow = "1.0"
        futures = "0.3"
        tonic = "0.10"
        ```
      </Tab>

      <Tab title="Go">
        ```bash theme={"system"}
        mkdir solana-grpc-stream
        cd solana-grpc-stream
        go mod init solana-grpc-stream
        go get github.com/rpcpool/yellowstone-grpc/examples/golang@latest
        go get google.golang.org/grpc@v1.67.1
        ```
      </Tab>
    </Tabs>
  </Step>

  <Step title="获取您的凭证">
    获取您的 gRPC 端点和身份验证：

    <Tabs>
      <Tab title="LaserStream">
        1. 在[dashboard.helius.dev](https://dashboard.helius.dev)注册（devnet在所有计划中可用；mainnet需要Business+）
        2. 从仪表板获取您的API密钥
        3. 选择您的区域端点：

        **Mainnet端点：**

        * 美国东部: `https://laserstream-mainnet-ewr.helius-rpc.com`
        * 美国西部: `https://laserstream-mainnet-slc.helius-rpc.com`
        * 欧洲: `https://laserstream-mainnet-fra.helius-rpc.com`
        * 亚洲: `https://laserstream-mainnet-tyo.helius-rpc.com`

        **Devnet:** `https://laserstream-devnet-ewr.helius-rpc.com`
      </Tab>

      <Tab title="专用节点">
        1. 从[dashboard.helius.dev](https://dashboard.helius.dev/dedicated-nodes)订购专用节点
        2. 节点配置完成后，您将收到：
           * 您的gRPC端点（通常为 `your-node.rpc.helius.dev:2053`）
           * 您的认证令牌
      </Tab>
    </Tabs>
  </Step>

  <Step title="创建您的第一个流">
    使用以下完整示例创建一个健壮的流管理器：

    <Tabs>
      <Tab title="TypeScript">
        创建 `stream-manager.ts`:

        ```typescript theme={"system"}
        import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
        import * as bs58 from 'bs58';

        export class StreamManager {
          private client: Client;
          private stream: any;
          private isConnected = false;
          private reconnectAttempts = 0;
          private readonly maxReconnectAttempts = 10;
          private readonly baseReconnectDelay = 1000; // 1 second

          constructor(
            private endpoint: string,
            private apiKey: string,
            private onData: (data: any) => void,
            private onError?: (error: any) => void
          ) {
            this.client = new Client(endpoint, apiKey, {
              "grpc.max_receive_message_length": 64 * 1024 * 1024
            });
          }

          async connect(subscribeRequest: SubscribeRequest): Promise<void> {
            try {
              console.log(`Connecting to ${this.endpoint}...`);
              this.stream = await this.client.subscribe();
              this.isConnected = true;
              this.reconnectAttempts = 0;

              // Set up event handlers
              this.stream.on("data", this.handleData.bind(this));
              this.stream.on("error", this.handleStreamError.bind(this));
              this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
              this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

              // Send subscription request
              await this.writeRequest(subscribeRequest);
              
              // Start keepalive
              this.startKeepalive();
              
              console.log("✅ Connected and subscribed successfully");
            } catch (error) {
              console.error("Connection failed:", error);
              await this.reconnect(subscribeRequest);
            }
          }

          private async writeRequest(request: SubscribeRequest): Promise<void> {
            return new Promise((resolve, reject) => {
              this.stream.write(request, (err: any) => {
                if (err) reject(err);
                else resolve();
              });
            });
          }

          private handleData(data: any): void {
            try {
              // Convert buffers to readable format
              const processedData = this.processBuffers(data);
              this.onData(processedData);
            } catch (error) {
              console.error("Error processing data:", error);
            }
          }

          private processBuffers(obj: any): any {
            if (!obj) return obj;
            
            if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
              return bs58.default.encode(obj);
            }
            
            if (Array.isArray(obj)) {
              return obj.map(item => this.processBuffers(item));
            }
            
            if (typeof obj === 'object') {
              return Object.fromEntries(
                Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
              );
            }
            
            return obj;
          }

          private handleStreamError(error: any): void {
            console.error("Stream error:", error);
            this.isConnected = false;
            if (this.onError) this.onError(error);
          }

          private async handleDisconnect(subscribeRequest: SubscribeRequest): Promise<void> {
            if (this.isConnected) {
              console.log("Stream disconnected, attempting to reconnect...");
              this.isConnected = false;
              await this.reconnect(subscribeRequest);
            }
          }

          private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
            if (this.reconnectAttempts >= this.maxReconnectAttempts) {
              console.error("Max reconnection attempts reached. Giving up.");
              return;
            }

            this.reconnectAttempts++;
            const delay = this.baseReconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5));
            
            console.log(`Reconnect attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms...`);
            
            setTimeout(() => {
              this.connect(subscribeRequest).catch(console.error);
            }, delay);
          }

          private startKeepalive(): void {
            setInterval(() => {
              if (this.isConnected) {
                const pingRequest: SubscribeRequest = {
                  ping: { id: Date.now() },
                  accounts: {},
                  accountsDataSlice: [],
                  transactions: {},
                  slots: {},
                  blocks: {},
                  blocksMeta: {},
                  entry: {},
                  transactionsStatus: {}
                };
                
                this.writeRequest(pingRequest).catch(console.error);
              }
            }, 30000); // 30 seconds
          }

          disconnect(): void {
            if (this.stream) {
              this.stream.end();
            }
            this.client.close();
            this.isConnected = false;
          }
        }
        ```

        创建 `main.ts`:

        ```typescript theme={"system"}
        import { StreamManager } from './stream-manager';
        import { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";

        // Configuration
        const ENDPOINT = "your-grpc-endpoint"; // LaserStream or Dedicated Node endpoint
        const API_KEY = "YOUR_API_KEY";

        // USDC Token Mint for example
        const USDC_MINT = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v";

        async function main() {
          const streamManager = new StreamManager(
            ENDPOINT,
            API_KEY,
            handleAccountUpdate,
            handleError
          );

          // Subscribe to USDC mint account updates
          const subscribeRequest: SubscribeRequest = {
            accounts: {
              accountSubscribe: {
                account: [USDC_MINT],
                owner: [],
                filters: []
              }
            },
            accountsDataSlice: [],
            commitment: CommitmentLevel.CONFIRMED,
            slots: {},
            transactions: {},
            transactionsStatus: {},
            blocks: {},
            blocksMeta: {},
            entry: {}
          };

          console.log("🚀 Starting USDC mint account monitoring...");
          await streamManager.connect(subscribeRequest);

          // Handle graceful shutdown
          process.on('SIGINT', () => {
            console.log('\n🛑 Shutting down...');
            streamManager.disconnect();
            process.exit(0);
          });
        }

        function handleAccountUpdate(data: any): void {
          if (data.account) {
            const account = data.account.account;
            console.log('\n📊 Account Update:');
            console.log(`  Account: ${account.pubkey}`);
            console.log(`  Owner: ${account.owner}`);
            console.log(`  Lamports: ${account.lamports}`);
            console.log(`  Data Length: ${account.data?.length || 0} bytes`);
            console.log(`  Slot: ${data.account.slot}`);
            console.log(`  Timestamp: ${new Date().toISOString()}`);
          }
          
          if (data.pong) {
            console.log(`💓 Keepalive pong received (id: ${data.pong.id})`);
          }
        }

        function handleError(error: any): void {
          console.error('❌ Stream error:', error.message);
        }

        main().catch(console.error);
        ```

        运行你的流：

        ```bash theme={"system"}
        npx ts-node main.ts
        ```
      </Tab>

      <Tab title="Rust">
        创建 `src/main.rs`:

        ```rust theme={"system"}
        use yellowstone_grpc_client::GeyserGrpcClient;
        use yellowstone_grpc_proto::prelude::*;
        use futures::StreamExt;
        use std::collections::HashMap;
        use tokio::time::{sleep, Duration};

        #[tokio::main]
        async fn main() -> anyhow::Result<()> {
            let endpoint = "your-grpc-endpoint"; // Replace with your endpoint
            let token = Some("YOUR_API_KEY".to_string()); // Replace with your API key
            
            let mut client = GeyserGrpcClient::connect(endpoint, token, None).await?;
            
            // USDC mint account
            let usdc_mint = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v";
            
            let mut accounts = HashMap::new();
            accounts.insert(
                "usdc_mint".to_string(),
                SubscribeRequestFilterAccounts {
                    account: vec![usdc_mint.to_string()],
                    owner: vec![],
                    filters: vec![],
                }
            );
            
            let mut stream = client.subscribe_once(
                accounts,
                HashMap::new(), // slots
                HashMap::new(), // transactions
                HashMap::new(), // blocks
                HashMap::new(), // blocks_meta
                None, // commitment
                HashMap::new(), // accounts_data_slice
                Some(CommitmentLevel::Confirmed),
                HashMap::new(), // entry
            ).await?;

            println!("🚀 Connected! Monitoring USDC mint account...");
            
            while let Some(message) = stream.next().await {
                match message {
                    Ok(msg) => {
                        if let Some(account) = msg.update_oneof {
                            match account {
                                subscribe_update::UpdateOneof::Account(account_update) => {
                                    println!("\n📊 Account Update:");
                                    println!("  Account: {}", account_update.account.as_ref()
                                        .map(|a| &a.pubkey).unwrap_or(&"N/A".to_string()));
                                    println!("  Lamports: {}", account_update.account.as_ref()
                                        .map(|a| a.lamports).unwrap_or(0));
                                    println!("  Slot: {}", account_update.slot);
                                }
                                _ => {} // Handle other update types as needed
                            }
                        }
                    }
                    Err(error) => {
                        eprintln!("❌ Stream error: {}", error);
                        sleep(Duration::from_secs(1)).await;
                    }
                }
            }
            
            Ok(())
        }
        ```

        运行你的流：

        ```bash theme={"system"}
        cargo run
        ```
      </Tab>

      <Tab title="Go">
        创建 `main.go`:

        ```go theme={"system"}
        package main

        import (
            "context"
            "fmt"
            "log"
            "time"

            "github.com/rpcpool/yellowstone-grpc/examples/golang/pkg/grpc"
            pb "github.com/rpcpool/yellowstone-grpc/examples/golang/pkg/proto"
            "google.golang.org/grpc/metadata"
        )

        func main() {
            endpoint := "your-grpc-endpoint" // Replace with your endpoint
            apiKey := "YOUR_API_KEY"         // Replace with your API key
            
            client, err := grpc.NewGrpcConnection(context.Background(), endpoint)
            if err != nil {
                log.Fatalf("Failed to connect: %v", err)
            }
            defer client.Close()
            
            // Add authentication
            ctx := metadata.AppendToOutgoingContext(context.Background(), "x-token", apiKey)
            
            // USDC mint account
            usdcMint := "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"
            
            stream, err := client.Subscribe(ctx)
            if err != nil {
                log.Fatalf("Failed to subscribe: %v", err)
            }
            
            // Send subscription request
            request := &pb.SubscribeRequest{
                Accounts: map[string]*pb.SubscribeRequestFilterAccounts{
                    "usdc_mint": {
                        Account: []string{usdcMint},
                        Owner:   []string{},
                        Filters: []*pb.SubscribeRequestFilterAccountsFilter{},
                    },
                },
                Commitment: pb.CommitmentLevel_CONFIRMED,
            }
            
            if err := stream.Send(request); err != nil {
                log.Fatalf("Failed to send request: %v", err)
            }
            
            fmt.Println("🚀 Connected! Monitoring USDC mint account...")
            
            for {
                response, err := stream.Recv()
                if err != nil {
                    log.Printf("❌ Stream error: %v", err)
                    time.Sleep(time.Second)
                    continue
                }
                
                if account := response.GetAccount(); account != nil {
                    fmt.Printf("\n📊 Account Update:\n")
                    fmt.Printf("  Account: %s\n", account.Account.Pubkey)
                    fmt.Printf("  Lamports: %d\n", account.Account.Lamports)
                    fmt.Printf("  Slot: %d\n", account.Slot)
                    fmt.Printf("  Timestamp: %s\n", time.Now().Format(time.RFC3339))
                }
            }
        }
        ```

        运行你的流：

        ```bash theme={"system"}
        go run main.go
        ```
      </Tab>
    </Tabs>
  </Step>

  <Step title="测试你的流">
    运行你的应用程序并确认其正常工作：

    1. **启动你的流** 使用你所用语言的命令
    2. **在控制台中查找连接确认**
    3. **等待账户更新** - 你应看到 USDC 铸币账户的周期性更新
    4. **测试重新连接** 通过临时断开互联网
    5. **验证保持连接** 每隔 30 秒观察一次 pong 消息

    **预期输出：**

    ```
    🚀 Connected! Monitoring USDC mint account...
    ✅ Connected and subscribed successfully
    💓 Keepalive pong received (id: 1703123456789)

    📊 Account Update:
      Account: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v
      Owner: TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb
      Lamports: 1461600
      Data Length: 82 bytes
      Slot: 275123456
      Timestamp: 2024-01-15T10:30:45.123Z
    ```
  </Step>
</Steps>

## 接下来是什么？

现在你有了一个可工作的 gRPC 流，可以探索这些监控指南：

<CardGroup cols={2}>
  <Card title="账户监控" icon="user" href="/zh/grpc/account-monitoring">
    高级账户过滤和数据切片技术
  </Card>

  <Card title="交易监控" icon="receipt" href="/zh/grpc/transaction-monitoring">
    流交易，具有程序过滤和执行细节
  </Card>

  <Card title="槽位和区块监控" icon="cube" href="/zh/grpc/slot-and-block-monitoring">
    监控网络共识和区块生产
  </Card>

  <Card title="流泵 AMM 数据" icon="chart-line" href="/zh/grpc/stream-pump-amm-data">
    现实示例：监控 DeFi 协议数据
  </Card>
</CardGroup>

## 疑难解答

<Accordion title="连接问题">
  **症状：** 连接超时，认证错误

  **解决方案：**

  * 验证您的端点URL和API密钥
  * 检查您的计划是否包含gRPC访问
  * 确保您使用正确的端口（专用节点通常为2053）
  * LaserStream devnet在所有计划中可用。 对于mainnet，您至少需要Business计划
</Accordion>

<Accordion title="未收到数据">
  **症状：** 流连接成功但没有出现账户更新

  **解决方案：**

  * USDC 铸造更新不频繁 - 尝试监控更活跃的账户
  * 检查你的承诺级别（尝试 `PROCESSED` 获取更频繁的更新）
  * 验证您的账户过滤配置
  * 监控一个代币账户而不是铸造以获得更多活动
</Accordion>

<Accordion title="流断开连接">
  **症状：** 频繁断开连接，重新连接循环

  **解决方案：**

  * 实施指数退避（上面的示例中包含）
  * 检查网络稳定性
  * 确保保持活动的 ping 正常工作（每 30 秒）
  * 监控服务器端速率限制
</Accordion>

## 最佳实践

<Note>
  **生产就绪检查表：**

  * ✅ 为重新连接实施指数退避
  * ✅ 每 30 秒使用一次保持活动的 ping
  * ✅ 处理所有流事件（数据、错误、结束、关闭）
  * ✅ 异步处理数据以避免阻塞
  * ✅ 监控连接健康状况并在失败时发出警报
  * ✅ 为您的使用场景使用合适的承诺级别
  * ✅ 尽可能具体地过滤数据以减少带宽
</Note>
