准备将实时区块链数据添加到您的应用程序中吗?本指南将帮助您在几分钟内连接并订阅 Solana 数据。

快速入门指南

1

获取您的 API 密钥

仪表板获取您的 Helius API 密钥。您需要它进行身份验证。
2

选择您的网络

选择适当的端点:
  • Mainnet: wss://mainnet.helius-rpc.com/?api-key=YOUR_API_KEY
  • Devnet: wss://devnet.helius-rpc.com/?api-key=YOUR_API_KEY
3

创建您的第一个连接

const ws = new WebSocket('wss://mainnet.helius-rpc.com/?api-key=YOUR_API_KEY');

ws.onopen = () => {
  console.log('Connected to Solana!');
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Received:', data);
};
4

订阅更新

// Subscribe to account changes
const subscribeMessage = {
  "jsonrpc": "2.0",
  "id": 1,
  "method": "accountSubscribe",
  "params": [
    "ACCOUNT_ADDRESS_HERE",
    { "commitment": "confirmed" }
  ]
};

ws.send(JSON.stringify(subscribeMessage));

订阅方法

账户监控

监控特定账户的余额变化、数据更新或所有权转移。
const ws = new WebSocket('wss://mainnet.helius-rpc.com/?api-key=YOUR_API_KEY');

const subscribeToAccount = (accountAddress) => {
  const request = {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "accountSubscribe",
    "params": [
      accountAddress,
      {
        "encoding": "jsonParsed",
        "commitment": "confirmed"
      }
    ]
  };
  
  ws.send(JSON.stringify(request));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  if (data.method === "accountNotification") {
    console.log("Account updated:", data.params.result.value);
    
    // Handle the account change
    const accountInfo = data.params.result.value;
    console.log(`New balance: ${accountInfo.lamports} lamports`);
  }
};

// Subscribe to a wallet address
subscribeToAccount("9PejEmViKHgUkVFWN57cNEZnFS4Qo6SzsLj5UPAXfDTF");

程序活动监控

监控由程序拥有的所有账户 - 非常适合监控 DEX 交易、NFT 转移或 DeFi 活动。
// Monitor Raydium AMM for all trade activity
const RAYDIUM_AMM_PROGRAM = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";

const subscribeToProgram = (programId) => {
  const request = {
    "jsonrpc": "2.0",
    "id": 3,
    "method": "programSubscribe",
    "params": [
      programId,
      {
        "encoding": "jsonParsed",
        "commitment": "confirmed",
        "filters": [
          {
            "dataSize": 752 // Raydium pool account size
          }
        ]
      }
    ]
  };
  
  ws.send(JSON.stringify(request));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  if (data.method === "programNotification") {
    console.log("Raydium pool updated:", data.params.result.value.pubkey);
    // Process the pool data change
    const accountData = data.params.result.value.account;
    // Parse and handle the pool state change
  }
};

subscribeToProgram(RAYDIUM_AMM_PROGRAM);

交易监控

跟踪特定交易或监控提到某些账户的交易。
// Monitor a specific transaction signature
const trackTransaction = (signature) => {
  const request = {
    "jsonrpc": "2.0",
    "id": 5,
    "method": "signatureSubscribe",
    "params": [
      signature,
      {
        "commitment": "confirmed"
      }
    ]
  };
  
  ws.send(JSON.stringify(request));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  if (data.method === "signatureNotification") {
    const result = data.params.result;
    
    if (result.value.err) {
      console.log("Transaction failed:", result.value.err);
    } else {
      console.log("Transaction confirmed successfully!");
    }
    
    // Subscription automatically ends after notification
  }
};

// Track a payment or swap transaction
trackTransaction("YOUR_TRANSACTION_SIGNATURE_HERE");

准生产环境实现

这是一个具有重连逻辑、错误处理和适当状态管理的健壮的 WebSocket 实现:
class WebSocketManager {
  constructor(endpoint) {
    this.endpoint = endpoint;
    this.ws = null;
    this.subscriptions = new Map();
    this.isConnected = false;
  }

  async connect() {
    this.ws = new WebSocket(this.endpoint);
    
    this.ws.onopen = () => {
      console.log('Connected');
      this.isConnected = true;
      this.resubscribeAll();
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      
      // Handle subscription confirmations
      if (data.result && typeof data.result === 'number') {
        const sub = Array.from(this.subscriptions.values())
          .find(s => s.requestId === data.id);
        if (sub) sub.subscriptionId = data.result;
        return;
      }

      // Handle notifications
      if (data.method?.endsWith('Notification')) {
        const sub = Array.from(this.subscriptions.values())
          .find(s => s.subscriptionId === data.params.subscription);
        if (sub?.callback) sub.callback(data.params.result);
      }
    };

    this.ws.onclose = () => {
      console.log('Disconnected');
      this.isConnected = false;
      // Implement reconnection logic here
    };
  }

  subscribe(method, params, callback) {
    const requestId = Date.now();
    const subscription = { requestId, method, params, callback, subscriptionId: null };
    this.subscriptions.set(requestId, subscription);
    
    if (this.isConnected) {
      this.ws.send(JSON.stringify({
        jsonrpc: '2.0',
        id: requestId,
        method,
        params
      }));
    }
    
    return requestId;
  }

  resubscribeAll() {
    for (const [id, sub] of this.subscriptions) {
      sub.subscriptionId = null;
      this.ws.send(JSON.stringify({
        jsonrpc: '2.0',
        id,
        method: sub.method,
        params: sub.params
      }));
    }
  }
}

// Usage
const wsManager = new WebSocketManager('wss://mainnet.helius-rpc.com/?api-key=YOUR_API_KEY');
await wsManager.connect();

// Subscribe to account changes
wsManager.subscribe(
  'accountSubscribe',
  ['account-address-here', { commitment: 'confirmed' }],
  (data) => console.log('Account updated:', data)
);

探索更多 WebSocket 方法

上面的示例涵盖了最常用的订阅类型,但 Solana 的 WebSocket API 提供了更多用于专业监控需求的方法。

完整的 WebSocket API 参考

发现 18+ WebSocket 方法,包括详细的文档、参数和响应格式。

真实世界的用例

DeFi 交易仪表板

class LiquidityPoolMonitor {
  constructor(wsManager) {
    this.wsManager = wsManager;
    this.pools = new Map();
  }

  monitorPool(poolAddress, tokenAMint, tokenBMint) {
    // Monitor the pool account itself
    const poolSubscription = this.wsManager.subscribe(
      'accountSubscribe',
      [poolAddress, { encoding: 'base64', commitment: 'confirmed' }],
      (data) => this.handlePoolUpdate(poolAddress, data)
    );

    // Monitor token reserves
    const tokenASubscription = this.wsManager.subscribe(
      'accountSubscribe', 
      [tokenAMint, { encoding: 'jsonParsed', commitment: 'confirmed' }],
      (data) => this.handleReserveUpdate(poolAddress, 'tokenA', data)
    );

    const tokenBSubscription = this.wsManager.subscribe(
      'accountSubscribe',
      [tokenBMint, { encoding: 'jsonParsed', commitment: 'confirmed' }],
      (data) => this.handleReserveUpdate(poolAddress, 'tokenB', data)
    );

    this.pools.set(poolAddress, {
      poolSubscription,
      tokenASubscription, 
      tokenBSubscription,
      lastUpdate: Date.now()
    });
  }

  handlePoolUpdate(poolAddress, data) {
    // Decode pool state and calculate price
    const poolState = this.decodePoolState(data.value.data);
    
    console.log(`Pool ${poolAddress} updated:`);
    console.log(`- Price: ${poolState.price}`);
    console.log(`- Liquidity: ${poolState.liquidity}`);
    console.log(`- Volume 24h: ${poolState.volume24h}`);
    
    // Emit price update event
    this.emitPriceUpdate(poolAddress, poolState);
  }

  handleReserveUpdate(poolAddress, tokenType, data) {
    const tokenAmount = data.value.data.parsed.info.tokenAmount;
    console.log(`${tokenType} reserve updated: ${tokenAmount.uiAmount}`);
    
    // Update internal state and recalculate price
    this.updatePoolPrice(poolAddress, tokenType, tokenAmount);
  }

  decodePoolState(data) {
    // Implement pool state decoding based on DEX
    // This is pseudo-code - actual implementation depends on the DEX
    return {
      price: 0,
      liquidity: 0,
      volume24h: 0
    };
  }

  emitPriceUpdate(poolAddress, poolState) {
    // Emit custom events for price updates
    window.dispatchEvent(new CustomEvent('poolPriceUpdate', {
      detail: { poolAddress, ...poolState }
    }));
  }
}

// Usage
const poolMonitor = new LiquidityPoolMonitor(wsManager);
poolMonitor.monitorPool(
  'POOL_ADDRESS',
  'TOKEN_A_MINT', 
  'TOKEN_B_MINT'
);

投资组合追踪器

class PortfolioTracker {
  constructor(wsManager) {
    this.wsManager = wsManager;
    this.watchedAddresses = new Set();
    this.balances = new Map();
  }

  addWallet(walletAddress) {
    if (this.watchedAddresses.has(walletAddress)) return;

    this.watchedAddresses.add(walletAddress);
    
    // Monitor SOL balance
    const solSubscription = this.wsManager.subscribe(
      'accountSubscribe',
      [walletAddress, { encoding: 'jsonParsed', commitment: 'confirmed' }],
      (data) => this.handleSolBalanceUpdate(walletAddress, data)
    );

    // Monitor all token accounts owned by this wallet
    const tokenSubscription = this.wsManager.subscribe(
      'programSubscribe',
      [
        'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA',
        {
          encoding: 'jsonParsed',
          commitment: 'confirmed',
          filters: [
            { memcmp: { offset: 32, bytes: walletAddress } }
          ]
        }
      ],
      (data) => this.handleTokenBalanceUpdate(walletAddress, data)
    );

    console.log(`Now tracking portfolio for ${walletAddress}`);
  }

  handleSolBalanceUpdate(walletAddress, data) {
    const lamports = data.value.lamports;
    const solBalance = lamports / 1e9;
    
    this.balances.set(`${walletAddress}:SOL`, solBalance);
    
    console.log(`${walletAddress} SOL balance: ${solBalance}`);
    this.emitBalanceUpdate(walletAddress, 'SOL', solBalance);
  }

  handleTokenBalanceUpdate(walletAddress, data) {
    const tokenData = data.value.account.data.parsed.info;
    const tokenMint = tokenData.mint;
    const balance = parseFloat(tokenData.tokenAmount.uiAmount);
    
    this.balances.set(`${walletAddress}:${tokenMint}`, balance);
    
    console.log(`${walletAddress} token ${tokenMint} balance: ${balance}`);
    this.emitBalanceUpdate(walletAddress, tokenMint, balance);
  }

  emitBalanceUpdate(walletAddress, asset, balance) {
    window.dispatchEvent(new CustomEvent('balanceUpdate', {
      detail: { walletAddress, asset, balance, timestamp: Date.now() }
    }));
  }

  getPortfolioValue(walletAddress) {
    // Calculate total portfolio value in USD
    // This would integrate with price feeds
    let totalValue = 0;
    
    for (const [key, balance] of this.balances) {
      if (key.startsWith(walletAddress)) {
        const asset = key.split(':')[1];
        const price = this.getAssetPrice(asset); // Implement price lookup
        totalValue += balance * price;
      }
    }
    
    return totalValue;
  }
}

NFT 市场集成

class NFTActivityMonitor {
  constructor(wsManager) {
    this.wsManager = wsManager;
    this.watchedCollections = new Map();
  }

  monitorCollection(collectionAddress, options = {}) {
    const { includeListings = true, includeSales = true, includeCancellations = true } = options;

    // Monitor the collection's update authority or creator address
    const collectionSubscription = this.wsManager.subscribe(
      'logsSubscribe',
      [
        { mentions: [collectionAddress] },
        { commitment: 'confirmed' }
      ],
      (data) => this.handleCollectionActivity(collectionAddress, data)
    );

    this.watchedCollections.set(collectionAddress, {
      subscription: collectionSubscription,
      options,
      stats: {
        listings: 0,
        sales: 0,
        volume: 0
      }
    });

    console.log(`Monitoring NFT collection: ${collectionAddress}`);
  }

  handleCollectionActivity(collectionAddress, data) {
    const logs = data.value.logs;
    const signature = data.value.signature;

    // Parse marketplace-specific logs
    const activity = this.parseMarketplaceActivity(logs);
    
    if (activity) {
      console.log(`NFT Activity in ${collectionAddress}:`);
      console.log(`- Type: ${activity.type}`);
      console.log(`- Price: ${activity.price} SOL`);
      console.log(`- Signature: ${signature}`);

      // Update collection stats
      this.updateCollectionStats(collectionAddress, activity);
      
      // Emit activity event
      this.emitNFTActivity(collectionAddress, activity, signature);
    }
  }

  parseMarketplaceActivity(logs) {
    // Parse Magic Eden, OpenSea, or other marketplace logs
    for (const log of logs) {
      // Magic Eden listing
      if (log.includes('Instruction: List')) {
        const price = this.extractPriceFromLog(log);
        return { type: 'listing', price };
      }
      
      // Magic Eden sale
      if (log.includes('Instruction: ExecuteSale')) {
        const price = this.extractPriceFromLog(log);
        return { type: 'sale', price };
      }
      
      // Listing cancellation
      if (log.includes('Instruction: CancelBuy') || log.includes('Instruction: CancelSell')) {
        return { type: 'cancellation', price: 0 };
      }
    }
    
    return null;
  }

  extractPriceFromLog(log) {
    // Extract price from marketplace logs - implementation depends on marketplace
    // This is pseudo-code
    const priceMatch = log.match(/price:\s*(\d+)/);
    return priceMatch ? parseInt(priceMatch[1]) / 1e9 : 0;
  }

  updateCollectionStats(collectionAddress, activity) {
    const collection = this.watchedCollections.get(collectionAddress);
    if (!collection) return;

    switch (activity.type) {
      case 'listing':
        collection.stats.listings++;
        break;
      case 'sale':
        collection.stats.sales++;
        collection.stats.volume += activity.price;
        break;
    }
  }

  emitNFTActivity(collectionAddress, activity, signature) {
    window.dispatchEvent(new CustomEvent('nftActivity', {
      detail: {
        collection: collectionAddress,
        activity,
        signature,
        timestamp: Date.now()
      }
    }));
  }
}

React 集成示例

import React, { useEffect, useState } from 'react';

function WebSocketComponent() {
  const [accountData, setAccountData] = useState(null);
  const [wsManager, setWsManager] = useState(null);

  useEffect(() => {
    // Initialize WebSocket
    const manager = new WebSocketManager('wss://mainnet.helius-rpc.com/?api-key=YOUR_API_KEY');
    
    manager.connect().then(() => {
      // Subscribe to account changes
      manager.subscribe(
        'accountSubscribe',
        ['account-address-here', { commitment: 'confirmed' }],
        (data) => setAccountData(data)
      );
    });

    setWsManager(manager);

    // Cleanup on unmount
    return () => {
      if (manager) manager.disconnect();
    };
  }, []);

  return (
    <div>
      <h3>Account Monitor</h3>
      {accountData ? (
        <div>
          <p>Balance: {accountData.value.lamports} lamports</p>
          <p>Owner: {accountData.value.owner}</p>
        </div>
      ) : (
        <p>Waiting for account updates...</p>
      )}
    </div>
  );
}

性能优化

// Use connection pooling for multiple subscriptions
class WebSocketPool {
  constructor(endpoint, maxConnections = 3) {
    this.endpoint = endpoint;
    this.connections = [];
    this.maxConnections = maxConnections;
    this.currentConnectionIndex = 0;
  }
  
  async getConnection() {
    if (this.connections.length < this.maxConnections) {
      const wsManager = new SolanaWebSocketManager(this.endpoint);
      await wsManager.connect();
      this.connections.push(wsManager);
      return wsManager;
    }
    
    // Round-robin selection
    const connection = this.connections[this.currentConnectionIndex];
    this.currentConnectionIndex = (this.currentConnectionIndex + 1) % this.connections.length;
    return connection;
  }
}

错误处理和调试

健壮的错误处理对于生产环境中的 WebSocket 应用程序至关重要。实施这些模式以优雅地处理网络问题、速率限制和 API 错误。
class ErrorHandlingWebSocket extends SolanaWebSocketManager {
  constructor(endpoint, options = {}) {
    super(endpoint, options);
    this.errorCounts = new Map();
    this.maxErrorsPerMinute = options.maxErrorsPerMinute || 10;
  }

  handleMessage(data) {
    try {
      // Handle rate limiting
      if (data.error && data.error.code === 429) {
        console.warn('Rate limited, backing off...');
        this.handleRateLimit();
        return;
      }

      // Handle subscription errors
      if (data.error && data.error.code === -32602) {
        console.error('Invalid subscription parameters:', data.error.message);
        this.handleInvalidSubscription(data.id);
        return;
      }

      // Handle network errors
      if (data.error && data.error.code === -32603) {
        console.error('Internal server error:', data.error.message);
        this.handleServerError();
        return;
      }

      super.handleMessage(data);
    } catch (error) {
      console.error('Error processing WebSocket message:', error);
      this.incrementErrorCount('message_processing');
    }
  }

  handleRateLimit() {
    // Exponential backoff for rate limiting
    const backoffDelay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    
    setTimeout(() => {
      if (this.subscriptions.size > 0) {
        console.log('Retrying after rate limit...');
        this.resubscribeAll();
      }
    }, backoffDelay);
  }

  handleInvalidSubscription(requestId) {
    // Remove invalid subscription
    const subscription = Array.from(this.subscriptions.entries())
      .find(([id, sub]) => sub.requestId === requestId);
    
    if (subscription) {
      console.warn(`Removing invalid subscription: ${subscription[1].method}`);
      this.subscriptions.delete(subscription[0]);
    }
  }

  handleServerError() {
    // Log server error and potentially switch endpoints
    this.incrementErrorCount('server_error');
    
    if (this.getErrorCount('server_error') > 5) {
      console.error('Too many server errors, consider switching endpoints');
      this.eventEmitter.dispatchEvent(new CustomEvent('tooManyServerErrors'));
    }
  }

  incrementErrorCount(errorType) {
    const now = Date.now();
    const errors = this.errorCounts.get(errorType) || [];
    
    // Add current error
    errors.push(now);
    
    // Remove errors older than 1 minute
    const oneMinuteAgo = now - 60000;
    const recentErrors = errors.filter(timestamp => timestamp > oneMinuteAgo);
    
    this.errorCounts.set(errorType, recentErrors);
    
    // Check if we're exceeding error threshold
    if (recentErrors.length > this.maxErrorsPerMinute) {
      console.error(`Too many ${errorType} errors in the last minute`);
      this.eventEmitter.dispatchEvent(new CustomEvent('errorThresholdExceeded', {
        detail: { errorType, count: recentErrors.length }
      }));
    }
  }

  getErrorCount(errorType) {
    const errors = this.errorCounts.get(errorType) || [];
    const oneMinuteAgo = Date.now() - 60000;
    return errors.filter(timestamp => timestamp > oneMinuteAgo).length;
  }
}

下一步

如有疑问或需要额外支持,请加入我们的 Discord 社区,我们的团队和社区成员随时准备提供帮助!