增强型 WebSocket

为什么选择增强型 WebSocket – 快速的 JSON 流,包含解码的账户和交易。无需自定义网络堆栈。

可用于商业及更高计划。

工作原理 – 连接到 Atlas 端点,订阅 Pump AMM 交易,并监听更新。示例重试五次,采用指数退避策略。

要求

  • Node.js ≥ 18(已在 v20 上测试)
  • 如果计划使用 ts‑node 运行 .ts 示例,则需要 TypeScript ≥ 5
  • 需要 Helius 商业计划或更高
  • 一个名为 HELIUS_API_KEY环境变量,用于存储您的 API 密钥
全局安装依赖项:npm i -g typescript ts‑node

实施

1

安装依赖项

npm install ws
2

创建 WebSocket 客户端

创建一个名为 enhanced-ws-pump.ts 的文件,并包含以下代码:
// enhanced-ws-pump.ts
import WebSocket from 'ws';

// Configuration for reconnection
const MAX_RETRIES = 5;
const INITIAL_RETRY_DELAY = 1000; // 1 second
let retryCount = 0;
let retryDelay = INITIAL_RETRY_DELAY;

// Function to create a new WebSocket connection
function createWebSocket() {
  return new WebSocket(`wss://atlas-mainnet.helius-rpc.com/?api-key=${process.env.HELIUS_API_KEY}`);
}

// Function to send a request to the WebSocket server
function sendRequest(ws: WebSocket) {
  const request = {
    jsonrpc: "2.0",
    id: 420,
    method: "transactionSubscribe",
    params: [
      {
        accountInclude: ["pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA"]
      },
      {
        commitment: "processed",
        encoding: "jsonParsed",
        transactionDetails: "full",
        maxSupportedTransactionVersion: 0
      }
    ]
  };
  ws.send(JSON.stringify(request));
}

// Function to send a ping to the WebSocket server
function startPing(ws: WebSocket) {
  return setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
      console.log('Ping sent');
    }
  }, 30000); // Ping every 30 seconds
}

// Function to handle reconnection
function reconnect() {
  if (retryCount >= MAX_RETRIES) {
    console.error('Maximum retry attempts reached.');
    return;
  }

  console.log(`Attempting to reconnect in ${retryDelay/1000} seconds... (Attempt ${retryCount + 1}/${MAX_RETRIES})`);

  setTimeout(() => {
    retryCount++;
    retryDelay *= 2; // Exponential backoff
    initializeWebSocket();
  }, retryDelay);
}

// Function to initialize WebSocket with all event handlers
function initializeWebSocket() {
  const ws = createWebSocket();
  let pingInterval: NodeJS.Timeout;

  ws.on('open', function open() {
    console.log('WebSocket is open');
    retryCount = 0; // Reset retry count on successful connection
    retryDelay = INITIAL_RETRY_DELAY; // Reset retry delay
    sendRequest(ws);
    pingInterval = startPing(ws);
  });

  ws.on('message', function incoming(data: WebSocket.Data) {
    const messageStr = data.toString('utf8');
    try {
      const messageObj = JSON.parse(messageStr);
      
      // Check if it's a subscription confirmation
      if (messageObj.result !== undefined) {
        console.log('Subscription confirmed:', messageObj);
        return;
      }
      
      // Check if it's transaction data
      if (messageObj.params && messageObj.params.result) {
        const transaction = messageObj.params.result;
        console.log('Received transaction:', JSON.stringify(transaction, null, 2));
      }
    } catch (e) {
      console.error('Failed to parse JSON:', e);
    }
  });

  ws.on('error', function error(err: Error) {
    console.error('WebSocket error:', err);
  });

  ws.on('close', function close() {
    console.log('WebSocket is closed');
    if (pingInterval) {
      clearInterval(pingInterval);
    }
    reconnect();
  });
}

// Start the WebSocket connection
initializeWebSocket();

// Handle program termination
process.on('SIGINT', () => {
  console.log('Shutting down...');
  process.exit(0);
});
3

设置环境变量

将您的 Helius API 密钥添加为环境变量:
export HELIUS_API_KEY=your-helius-api-key
用您在仪表板中实际的 Helius API 密钥替换 your-helius-api-key
4

运行应用程序

执行脚本以开始流式传输 Pump AMM 数据:
npx ts-node enhanced-ws-pump.ts
您将在终端中看到解析后的 Pump AMM 交易。当套接字关闭时,客户端会自动重试。

主要优势

  • 浏览器兼容 - WebSocket 协议在 Node.js 和浏览器环境中均可使用
  • 丰富的数据 - 获取完全解析的交易对象,包含解码的指令和账户
  • 简单的实现 - 除了标准的 WebSocket 客户端外,不需要特殊的库
  • 自动重连 - 内置重试逻辑确保稳定连接

常见问题和解决方案

下一步

1

创建UI仪表板

使用React或Vue.js构建一个网页界面,以实时可视化传入的Pump AMM交易。
2

实现数据库存储

将交易数据存储在MongoDB或PostgreSQL等数据库中以进行历史分析:
import { MongoClient } from 'mongodb';

// Setup MongoDB connection
async function setupDatabase() {
  const client = new MongoClient('mongodb://localhost:27017');
  await client.connect();
  return client.db('pump-amm').collection('transactions');
}

// Then in your message handler:
ws.on('message', async function incoming(data: WebSocket.Data) {
  const messageStr = data.toString('utf8');
  try {
    const messageObj = JSON.parse(messageStr);
    
    if (messageObj.params && messageObj.params.result) {
      const transaction = messageObj.params.result;
      
      // Store in database
      const collection = await setupDatabase();
      await collection.insertOne({
        timestamp: new Date(),
        transaction: transaction
      });
      
      console.log('Transaction stored in database');
    }
  } catch (e) {
    console.error('Failed to process message:', e);
  }
});
3

设置警报系统

使用Discord webhooks等服务配置高价值交易或特定模式的警报:
import axios from 'axios';

// Send alert to Discord webhook
async function sendAlert(message: string) {
  await axios.post('YOUR_DISCORD_WEBHOOK_URL', {
    content: message
  });
}

// Then in your message handler:
if (messageObj.params && messageObj.params.result) {
  const transaction = messageObj.params.result;
  
  // Example: Check for transactions above a certain value
  const isHighValue = checkIfHighValueTransaction(transaction);
  
  if (isHighValue) {
    sendAlert(`High-value transaction detected: ${transaction.signature}`);
  }
}
4

实现心跳监控

添加一个更强大的心跳系统以确保持续连接:
// Enhanced heartbeat system
function setupHeartbeat(ws: WebSocket) {
  let lastPongTime = Date.now();
  
  // Send ping regularly
  const pingInterval = setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
    }
  }, 30000);
  
  // Track pong responses
  ws.on('pong', () => {
    lastPongTime = Date.now();
  });
  
  // Check connection health
  const healthCheck = setInterval(() => {
    const now = Date.now();
    if (now - lastPongTime > 90000) {  // No pong for 90 seconds
      console.warn('Connection seems unresponsive, reconnecting...');
      ws.terminate();
      clearInterval(pingInterval);
      clearInterval(healthCheck);
    }
  }, 30000);
  
  return { pingInterval, healthCheck };
}