增强型 WebSocket
为什么选择增强型 WebSocket – 快速的 JSON 流,包含解码的账户和交易。无需自定义网络堆栈。
工作原理 – 连接到 Atlas 端点,订阅 Pump AMM 交易,并监听更新。示例重试五次,采用指数退避策略。
Node.js ≥ 18 (已在 v20 上测试)如果计划使用 ts‑node
运行 .ts
示例,则需要 TypeScript ≥ 5 需要 Helius 商业计划或更高 一个名为 HELIUS_API_KEY
的环境变量 ,用于存储您的 API 密钥 全局安装依赖项:npm i -g typescript ts‑node
创建 WebSocket 客户端
创建一个名为 enhanced-ws-pump.ts
的文件,并包含以下代码: // enhanced-ws-pump.ts
import WebSocket from 'ws' ;
// Configuration for reconnection
const MAX_RETRIES = 5 ;
const INITIAL_RETRY_DELAY = 1000 ; // 1 second
let retryCount = 0 ;
let retryDelay = INITIAL_RETRY_DELAY ;
// Function to create a new WebSocket connection
function createWebSocket () {
return new WebSocket ( `wss://atlas-mainnet.helius-rpc.com/?api-key= ${ process . env . HELIUS_API_KEY } ` );
}
// Function to send a request to the WebSocket server
function sendRequest ( ws : WebSocket ) {
const request = {
jsonrpc: "2.0" ,
id: 420 ,
method: "transactionSubscribe" ,
params: [
{
accountInclude: [ "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA" ]
},
{
commitment: "processed" ,
encoding: "jsonParsed" ,
transactionDetails: "full" ,
maxSupportedTransactionVersion: 0
}
]
};
ws . send ( JSON . stringify ( request ));
}
// Function to send a ping to the WebSocket server
function startPing ( ws : WebSocket ) {
return setInterval (() => {
if ( ws . readyState === WebSocket . OPEN ) {
ws . ping ();
console . log ( 'Ping sent' );
}
}, 30000 ); // Ping every 30 seconds
}
// Function to handle reconnection
function reconnect () {
if ( retryCount >= MAX_RETRIES ) {
console . error ( 'Maximum retry attempts reached.' );
return ;
}
console . log ( `Attempting to reconnect in ${ retryDelay / 1000 } seconds... (Attempt ${ retryCount + 1 } / ${ MAX_RETRIES } )` );
setTimeout (() => {
retryCount ++ ;
retryDelay *= 2 ; // Exponential backoff
initializeWebSocket ();
}, retryDelay );
}
// Function to initialize WebSocket with all event handlers
function initializeWebSocket () {
const ws = createWebSocket ();
let pingInterval : NodeJS . Timeout ;
ws . on ( 'open' , function open () {
console . log ( 'WebSocket is open' );
retryCount = 0 ; // Reset retry count on successful connection
retryDelay = INITIAL_RETRY_DELAY ; // Reset retry delay
sendRequest ( ws );
pingInterval = startPing ( ws );
});
ws . on ( 'message' , function incoming ( data : WebSocket . Data ) {
const messageStr = data . toString ( 'utf8' );
try {
const messageObj = JSON . parse ( messageStr );
// Check if it's a subscription confirmation
if ( messageObj . result !== undefined ) {
console . log ( 'Subscription confirmed:' , messageObj );
return ;
}
// Check if it's transaction data
if ( messageObj . params && messageObj . params . result ) {
const transaction = messageObj . params . result ;
console . log ( 'Received transaction:' , JSON . stringify ( transaction , null , 2 ));
}
} catch ( e ) {
console . error ( 'Failed to parse JSON:' , e );
}
});
ws . on ( 'error' , function error ( err : Error ) {
console . error ( 'WebSocket error:' , err );
});
ws . on ( 'close' , function close () {
console . log ( 'WebSocket is closed' );
if ( pingInterval ) {
clearInterval ( pingInterval );
}
reconnect ();
});
}
// Start the WebSocket connection
initializeWebSocket ();
// Handle program termination
process . on ( 'SIGINT' , () => {
console . log ( 'Shutting down...' );
process . exit ( 0 );
});
设置环境变量
将您的 Helius API 密钥添加为环境变量: export HELIUS_API_KEY = your-helius-api-key
用您在仪表板中实际的 Helius API 密钥替换 your-helius-api-key
。
运行应用程序
执行脚本以开始流式传输 Pump AMM 数据: npx ts-node enhanced-ws-pump.ts
您将在终端中看到解析后的 Pump AMM 交易。当套接字关闭时,客户端会自动重试。
主要优势
浏览器兼容 - WebSocket 协议在 Node.js 和浏览器环境中均可使用
丰富的数据 - 获取完全解析的交易对象,包含解码的指令和账户
简单的实现 - 除了标准的 WebSocket 客户端外,不需要特殊的库
自动重连 - 内置重试逻辑确保稳定连接
常见问题和解决方案
验证您的HELIUS_API_KEY是否正确。
确保Pump AMM程序地址正确,并且程序上有活动。
实施更强大的重连逻辑或检查网络稳定性。
下一步
创建UI仪表板
使用React或Vue.js构建一个网页界面,以实时可视化传入的Pump AMM交易。
实现数据库存储
将交易数据存储在MongoDB或PostgreSQL等数据库中以进行历史分析: import { MongoClient } from 'mongodb' ;
// Setup MongoDB connection
async function setupDatabase () {
const client = new MongoClient ( 'mongodb://localhost:27017' );
await client . connect ();
return client . db ( 'pump-amm' ). collection ( 'transactions' );
}
// Then in your message handler:
ws . on ( 'message' , async function incoming ( data : WebSocket . Data ) {
const messageStr = data . toString ( 'utf8' );
try {
const messageObj = JSON . parse ( messageStr );
if ( messageObj . params && messageObj . params . result ) {
const transaction = messageObj . params . result ;
// Store in database
const collection = await setupDatabase ();
await collection . insertOne ({
timestamp: new Date (),
transaction: transaction
});
console . log ( 'Transaction stored in database' );
}
} catch ( e ) {
console . error ( 'Failed to process message:' , e );
}
});
设置警报系统
使用Discord webhooks等服务配置高价值交易或特定模式的警报: import axios from 'axios' ;
// Send alert to Discord webhook
async function sendAlert ( message : string ) {
await axios . post ( 'YOUR_DISCORD_WEBHOOK_URL' , {
content: message
});
}
// Then in your message handler:
if ( messageObj . params && messageObj . params . result ) {
const transaction = messageObj . params . result ;
// Example: Check for transactions above a certain value
const isHighValue = checkIfHighValueTransaction ( transaction );
if ( isHighValue ) {
sendAlert ( `High-value transaction detected: ${ transaction . signature } ` );
}
}
实现心跳监控
添加一个更强大的心跳系统以确保持续连接: // Enhanced heartbeat system
function setupHeartbeat ( ws : WebSocket ) {
let lastPongTime = Date . now ();
// Send ping regularly
const pingInterval = setInterval (() => {
if ( ws . readyState === WebSocket . OPEN ) {
ws . ping ();
}
}, 30000 );
// Track pong responses
ws . on ( 'pong' , () => {
lastPongTime = Date . now ();
});
// Check connection health
const healthCheck = setInterval (() => {
const now = Date . now ();
if ( now - lastPongTime > 90000 ) { // No pong for 90 seconds
console . warn ( 'Connection seems unresponsive, reconnecting...' );
ws . terminate ();
clearInterval ( pingInterval );
clearInterval ( healthCheck );
}
}, 30000 );
return { pingInterval , healthCheck };
}