跳转到主要内容
Enhanced WebSockets 提供快速的 JSON 流,包含解码的账户和交易。无需自定义网络栈。Enhanced WebSockets 可用于商业计划及以上。

工作原理

要求

  • Node.js ≥ 18 (测试版本为 v20)
  • 如果计划使用 ts‑node 运行 .ts 示例,需要 TypeScript ≥ 5
  • 需要 Helius 商业计划或更高版本
  • 需要一个名为 HELIUS_API_KEY环境变量来存储您的 API 密钥
全局安装依赖项:npm i -g typescript ts‑node

实现

1

安装依赖项

npm install ws
2

创建 WebSocket 客户端

创建一个名为 enhanced-ws-pump.ts 的文件,包含以下代码:
// enhanced-ws-pump.ts
import WebSocket from 'ws';

// Configuration for reconnection
const MAX_RETRIES = 5;
const INITIAL_RETRY_DELAY = 1000; // 1 second
let retryCount = 0;
let retryDelay = INITIAL_RETRY_DELAY;

// Function to create a new WebSocket connection
function createWebSocket() {
  return new WebSocket(`wss://mainnet.helius-rpc.com/?api-key=${process.env.HELIUS_API_KEY}`);
}

// Function to send a request to the WebSocket server
function sendRequest(ws: WebSocket) {
  const request = {
    jsonrpc: "2.0",
    id: 420,
    method: "transactionSubscribe",
    params: [
      {
        accountInclude: ["pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA"]
      },
      {
        commitment: "processed",
        encoding: "jsonParsed",
        transactionDetails: "full",
        maxSupportedTransactionVersion: 0
      }
    ]
  };
  ws.send(JSON.stringify(request));
}

// Function to send a ping to the WebSocket server
function startPing(ws: WebSocket) {
  return setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
      console.log('Ping sent');
    }
  }, 30000); // Ping every 30 seconds
}

// Function to handle reconnection
function reconnect() {
  if (retryCount >= MAX_RETRIES) {
    console.error('Maximum retry attempts reached.');
    return;
  }

  console.log(`Attempting to reconnect in ${retryDelay/1000} seconds... (Attempt ${retryCount + 1}/${MAX_RETRIES})`);

  setTimeout(() => {
    retryCount++;
    retryDelay *= 2; // Exponential backoff
    initializeWebSocket();
  }, retryDelay);
}

// Function to initialize WebSocket with all event handlers
function initializeWebSocket() {
  const ws = createWebSocket();
  let pingInterval: NodeJS.Timeout;

  ws.on('open', function open() {
    console.log('WebSocket is open');
    retryCount = 0; // Reset retry count on successful connection
    retryDelay = INITIAL_RETRY_DELAY; // Reset retry delay
    sendRequest(ws);
    pingInterval = startPing(ws);
  });

  ws.on('message', function incoming(data: WebSocket.Data) {
    const messageStr = data.toString('utf8');
    try {
      const messageObj = JSON.parse(messageStr);
      
      // Check if it's a subscription confirmation
      if (messageObj.result !== undefined) {
        console.log('Subscription confirmed:', messageObj);
        return;
      }
      
      // Check if it's transaction data
      if (messageObj.params && messageObj.params.result) {
        const transaction = messageObj.params.result;
        console.log('Received transaction:', JSON.stringify(transaction, null, 2));
      }
    } catch (e) {
      console.error('Failed to parse JSON:', e);
    }
  });

  ws.on('error', function error(err: Error) {
    console.error('WebSocket error:', err);
  });

  ws.on('close', function close() {
    console.log('WebSocket is closed');
    if (pingInterval) {
      clearInterval(pingInterval);
    }
    reconnect();
  });
}

// Start the WebSocket connection
initializeWebSocket();

// Handle program termination
process.on('SIGINT', () => {
  console.log('Shutting down...');
  process.exit(0);
});
3

设置环境变量

添加您的 Helius API 密钥作为环境变量:
export HELIUS_API_KEY=your-helius-api-key
your-helius-api-key 替换为您从仪表板中获取的实际 Helius API 密钥。如果您没有 API 密钥,注册 一个账户,或者让您的代理使用 Helius CLI 以编程方式创建一个。
4

运行应用程序

执行脚本以开始流式传输 Pump AMM 数据:
npx ts-node enhanced-ws-pump.ts
您将在终端中看到解析后的 Pump AMM 交易。当套接字关闭时,客户端会自动重试。

主要优势

  • 浏览器兼容 - WSS 在 Node.js 和浏览器环境中均可使用
  • 丰富数据 - 全解析的交易对象,包含解码指令和账户
  • 简单实现 - 无需特殊库(只需标准 WSS 客户端)
  • 自动重连 - 内置重试逻辑确保稳定连接

常见问题和解决方案

验证你的 HELIUS_API_KEY 是否正确。
确保 Pump AMM 程序地址 正确并且有活动。
实现更健壮的重新连接逻辑或检查网络稳定性。

下一步

1

创建UI仪表板

使用React或Vue.js构建一个网页界面,以实时可视化传入的Pump AMM交易。
2

实现数据库存储

将交易数据存储在MongoDB或PostgreSQL等数据库中以进行历史分析:
import { MongoClient } from 'mongodb';

// Setup MongoDB connection
async function setupDatabase() {
  const client = new MongoClient('mongodb://localhost:27017');
  await client.connect();
  return client.db('pump-amm').collection('transactions');
}

// Then in your message handler:
ws.on('message', async function incoming(data: WebSocket.Data) {
  const messageStr = data.toString('utf8');
  try {
    const messageObj = JSON.parse(messageStr);
    
    if (messageObj.params && messageObj.params.result) {
      const transaction = messageObj.params.result;
      
      // Store in database
      const collection = await setupDatabase();
      await collection.insertOne({
        timestamp: new Date(),
        transaction: transaction
      });
      
      console.log('Transaction stored in database');
    }
  } catch (e) {
    console.error('Failed to process message:', e);
  }
});
3

设置警报系统

使用Discord webhooks等服务配置高价值交易或特定模式的警报:
import axios from 'axios';

// Send alert to Discord webhook
async function sendAlert(message: string) {
  await axios.post('YOUR_DISCORD_WEBHOOK_URL', {
    content: message
  });
}

// Then in your message handler:
if (messageObj.params && messageObj.params.result) {
  const transaction = messageObj.params.result;
  
  // Example: Check for transactions above a certain value
  const isHighValue = checkIfHighValueTransaction(transaction);
  
  if (isHighValue) {
    sendAlert(`High-value transaction detected: ${transaction.signature}`);
  }
}
4

实现心跳监控

添加一个更强大的心跳系统以确保持续连接:
// Enhanced heartbeat system
function setupHeartbeat(ws: WebSocket) {
  let lastPongTime = Date.now();
  
  // Send ping regularly
  const pingInterval = setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
    }
  }, 30000);
  
  // Track pong responses
  ws.on('pong', () => {
    lastPongTime = Date.now();
  });
  
  // Check connection health
  const healthCheck = setInterval(() => {
    const now = Date.now();
    if (now - lastPongTime > 90000) {  // No pong for 90 seconds
      console.warn('Connection seems unresponsive, reconnecting...');
      ws.terminate();
      clearInterval(pingInterval);
      clearInterval(healthCheck);
    }
  }, 30000);
  
  return { pingInterval, healthCheck };
}